diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 642691bf..93bb9269 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -5,9 +5,12 @@ import ( "context" "fmt" "io" + "os" "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" pb "go.gazette.dev/core/broker/protocol" @@ -422,10 +425,13 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { // appendBuffer composes a backing File with a bufio.Writer, and additionally // tracks the offset through which the file is written. type appendBuffer struct { + // appendBuffer's file presents a limited file interface to not assume more than necessary + // about what it can be used for file interface { io.ReaderAt io.Seeker io.Writer + Size() int64 } offset int64 buf *bufio.Writer @@ -517,6 +523,7 @@ func newAppendBufferPool() *sync.Pool { }, "", "failed to create appendBuffer") fb.pool = pool + AppendServiceCollector.Register(fb.file) return fb } @@ -544,3 +551,55 @@ var ( appendBufferSize = 8 * 1024 // 8KB. appendBufferCutoff int64 = 1 << 26 // 64MB. ) + +// appendServiceCollector implements prometheus.Collector - registers the files associated with +// each appendBuffer created and emits the total disk usage when asked by prometheus +type appendServiceCollector struct { + files []interface { + Size() int64 + } +} + +func (asc *appendServiceCollector) Register(file interface{ Size() int64 }) { + asc.files = append(asc.files, file) +} + +// Describe implements prometheus.Collector +func (asc *appendServiceCollector) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(asc, ch) +} + +// Collect implements prometheus.Collector +func (asc *appendServiceCollector) Collect(ch chan<- prometheus.Metric) { + var size int64 + for _, f := range asc.files { + size += f.Size() + } + ch <- prometheus.MustNewConstMetric( + appendServiceDiskBufferDesc, + prometheus.GaugeValue, + float64(size)) +} + +var ( + AppendServiceCollector = &appendServiceCollector{} + + appendServiceDiskBufferDesc = prometheus.NewDesc( + "gazette_append_service_disk_buffer_bytes", + "The total size in bytes on disk used by file-backed appendBuffers.", + []string{}, nil) +) + +// file provides an implementation of appendBuffer.file interface with necessary Size() method for collecting +// metrics +type file struct { + *os.File +} + +func (f *file) Size() int64 { + if stat, err := f.Stat(); err != nil { + return 0 + } else { + return stat.Size() + } +} diff --git a/broker/client/append_service_unix.go b/broker/client/append_service_unix.go index 0e6599e5..7d3910e2 100644 --- a/broker/client/append_service_unix.go +++ b/broker/client/append_service_unix.go @@ -20,8 +20,9 @@ var newAppendBuffer = func() (*appendBuffer, error) { } else if err = os.Remove(f.Name()); err != nil { return nil, err } else { - var fb = &appendBuffer{file: f} + var fb = &appendBuffer{file: &file{f}} fb.buf = bufio.NewWriterSize(fb, appendBufferSize) return fb, nil } } + diff --git a/broker/client/append_service_win.go b/broker/client/append_service_win.go index e4a8d5ad..df2337aa 100644 --- a/broker/client/append_service_win.go +++ b/broker/client/append_service_win.go @@ -20,7 +20,7 @@ var newAppendBuffer = func() (*appendBuffer, error) { return nil, err } else { runtime.SetFinalizer(f, removeFileFinalizer) - var fb = &appendBuffer{file: f} + var fb = &appendBuffer{file: &file{f}} fb.buf = bufio.NewWriterSize(fb, appendBufferSize) return fb, nil } diff --git a/go.mod b/go.mod index 8d1e18c8..b2edc7af 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0 github.com/gogo/protobuf v1.3.2 - github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 github.com/google/uuid v1.3.0 github.com/gorilla/schema v1.2.0 @@ -52,7 +51,7 @@ require ( golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect google.golang.org/api v0.56.0 google.golang.org/grpc v1.40.0 - google.golang.org/protobuf v1.27.1 // indirect + google.golang.org/protobuf v1.27.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.0.0-20190620073856-dcce3486da33 diff --git a/go.sum b/go.sum index 98c567d9..d0c1f0bc 100644 --- a/go.sum +++ b/go.sum @@ -154,8 +154,6 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 66b8c102..d9c43b00 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -157,7 +157,7 @@ func (sc Cmd) Execute(args []string) error { ks.WatchApplyDelay = bc.Consumer.WatchDelay // Register Resolver as a prometheus.Collector for tracking shard status - prometheus.MustRegister(service.Resolver) + prometheus.MustRegister(service.Resolver, client.AppendServiceCollector) log.WithFields(log.Fields{ "zone": spec.Id.Zone,