diff --git a/commontypes/metrics.go b/commontypes/metrics.go new file mode 100644 index 00000000..b71144f8 --- /dev/null +++ b/commontypes/metrics.go @@ -0,0 +1,47 @@ +package commontypes + +type Metrics interface { + // NewMetricVec creates a new MetricVec with the provided name and help + // and partitioned by the given label names + // The name and label names may contain ASCII letters, numbers, as well as underscores. + // An error is returned if either the name or some label names is of invalid format. + NewMetricVec(name string, help string, labelNames ...string) (MetricVec, error) +} + +// MetricVec must be thread safe +type MetricVec interface { + // GetMetricWith returns a Metric for the given MetricVec with the given labels map. + // If that label map is assessed for the first time, a new Metric is created. + // Label values may contain any Unicode character (UTF-8 encoded). + // An error is returned if the number and names of the labels are + // inconsistent with those of the variable labels of the MetricVec + // or if some labels' value is empty + // or if some labels' value is not a valid UTF-8 string. + // Label names are not validated since they are already validated when the MetricVec is constructed. + // The implementation of this method must guarantee that different invocations + // on the same MetricVec with the same label map returns the same Metric. + // I.e. for + // m1, err1 := metricVec.MetricWithLabels(L1) + // m2, err2 := metricVec.MetricWithLabels(L2) + // s.t. reflect.DeepEqual(L1, L2) + // then (err1 == nil && err2 == nil) implies m1 == m2 + GetMetricWith(labels map[string]string) (Metric, error) +} + +// Metric must be thread safe +type Metric interface { + // Set sets the Metric to an arbitrary value. + Set(float64) + // Inc increments the Metric by 1. Use Add to increment it by arbitrary + // values. + Inc() + // Dec decrements the Metric by 1. Use Sub to decrement it by arbitrary + // values. + Dec() + // Add adds the given value to the Metric. (The value can be negative, + // resulting in a decrease of the Metric.) + Add(float64) + // Sub subtracts the given value from the Metric. (The value can be + // negative, resulting in an increase of the Metric.) + Sub(float64) +} diff --git a/go.mod b/go.mod index 1202d9f2..1d53f627 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/cp v1.1.1 // indirect - github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set v1.7.1 // indirect @@ -43,6 +43,7 @@ require ( github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.3.1 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.1.5 // indirect @@ -139,7 +140,7 @@ require ( go.uber.org/zap v1.16.0 // indirect golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect + golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/text v0.3.6 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 // indirect diff --git a/go.sum b/go.sum index 54f2b9f0..f15fee08 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,9 @@ github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -232,8 +233,9 @@ github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgR github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 h1:5ZkaAPbicIKTF2I64qf5Fh8Aa83Q/dnOafMYV0OMwjA= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -1163,8 +1165,9 @@ golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 h1:uCLL3g5wH2xjxVREVuAbP9JM5PPKjRbXKRa6IBjkzmU= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/loghelper/close_log_error.go b/internal/loghelper/close_log_error.go index d557f219..cb384971 100644 --- a/internal/loghelper/close_log_error.go +++ b/internal/loghelper/close_log_error.go @@ -15,3 +15,44 @@ func CloseLogError(closer io.Closer, logger commontypes.Logger, msg string) { }) } } + +// Closes closer if success is false. If an error occurs, it is logged at WARN level together with +// msg +// +// Useful for deferred closing in a constructor like this: +// +// func newFoo(logger commontypes.Logger) (*Foo, err) { +// success := false +// +// bar, err := newBar() +// if err != nil { +// return nil, err +// } +// defer loghelper.CloseLogErrorUnlessSuccess(&success, bar, logger, "failed to close bar in failed newFoo") +// +// baz, err := newBaz() +// if err != nil { +// return nil, err +// } +// defer loghelper.CloseLogErrorUnlessSuccess(&success, baz, logger, "failed to close baz in failed newFoo") +// +// success = true +// return &Foo{ +// bar, +// baz +// } +// } +func CloseLogErrorUnlessSuccess(success *bool, closer io.Closer, logger commontypes.Logger, msg string) { + if success != nil && *success { + // no failure, return early + return + } + if success == nil { + logger.Debug("CloseLogErrorUnlessSuccess: got nil success value. this should not happen, assuming it means we did not succeed", nil) + } + if err := closer.Close(); err != nil { + logger.Warn(msg, commontypes.LogFields{ + "error": err, + }) + } +} diff --git a/internal/metrics/nonblocking_metrics.go b/internal/metrics/nonblocking_metrics.go new file mode 100644 index 00000000..9380f1fc --- /dev/null +++ b/internal/metrics/nonblocking_metrics.go @@ -0,0 +1,254 @@ +package metrics + +import ( + "context" + "fmt" + "sync" + "time" + "unicode/utf8" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/subprocesses" +) + +var _ commontypes.Metrics = (*NonblockingMetricsWrapper)(nil) + +type NonblockingMetricsWrapper struct { + subs subprocesses.Subprocesses + ctx context.Context + ctxCancel context.CancelFunc + logger commontypes.Logger + flushInterval time.Duration + metricsImpl commontypes.Metrics + metricIndex metricIndexMap // metricFingerprint(name, labels) => nonblockingMetricVec +} + +// NewNonblockingMetricsWrapper is a non-blocking implementation of the commontypes.Metrics interface. +// The provided metricsImpl is used to instantiate wrapper vectors of type commontypes.MetricVec with NewMetricVec +// as well as wrapper metrics with GetMerticWhith on the wrapper vectors. +// All method calls on the wrappers are non-blocking, even if the respective calls for the underlying wrapped +// metricsImpl could block. +// To achieve this, NewNonblockingMetricsWrapper starts a flush loop which periodically, according to the provided interval, +// applies collectively the result of all the calls on the wrappers methods to the respective wrapped metricsImpl instances. +// The order of the method calls is reflected on the applied result. +func NewNonblockingMetricsWrapper(logger commontypes.Logger, metricsImpl commontypes.Metrics, flushInterval time.Duration) *NonblockingMetricsWrapper { + logger.Info("NonblockingMetricsWrapper: New non-blocking metrics wrapper", commontypes.LogFields{"flushInterval": flushInterval.String()}) + ctx, ctxCancel := context.WithCancel(context.Background()) + nbmw := &NonblockingMetricsWrapper{ + subprocesses.Subprocesses{}, + ctx, + ctxCancel, + logger, + flushInterval, + metricsImpl, + metricIndexMap{}, + } + + nbmw.subs.Go(nbmw.flushLoop) + return nbmw +} + +// NewMetricVec returns a commontypes.MetricVec without blocking, even if the invocation of NewMetricVec of the +// underlying metrics implementation is blocking. NewMetricVec for the underlying metrics implementation is only invoked +// in the flush loop of the NonblockingMetricsWrapper the first time a value for a metric of the returned metric vector +// is encountered. +// The name and label names may contain ASCII letters, numbers, as well as underscores. +// An error is returned if either the name or some label names is of invalid format. +func (nbmw *NonblockingMetricsWrapper) NewMetricVec(name string, help string, labelNames ...string) (commontypes.MetricVec, error) { + if err := validName(name); err != nil { + return nil, fmt.Errorf("could not create metric vector with invalid name: %w", err) + } + + if err := validLabelNames(labelNames); err != nil { + return nil, fmt.Errorf("could not create metric vector with invalid label names: %w", err) + } + + return &nonblockingMetricVec{ + nbmw, + name, + help, + labelNames, + nil, // initialized in flush loop + }, nil +} + +func validLabelNames(labelNames []string) error { + for _, name := range labelNames { + if err := validName(name); err != nil { + return fmt.Errorf("format of label name %q is not valid: %w", name, err) + } + } + return nil +} + +// Close shuts down the flush loop and waits for the go routines of the NonblockingMetricsWrapper to finish +// Close does not close the underlying metrics implementation. It should be the responsibility of the underlying +// metrics implementation to shut down without go routines leakage +func (nbmw *NonblockingMetricsWrapper) Close() error { + nbmw.logger.Debug("NonblockingMetricsWrapper: Closing non-blocking metrics wrapper", nil) + nbmw.ctxCancel() + nbmw.subs.Wait() + nbmw.logger.Info("NonblockingMetricsWrapper: Closed non-blocking metrics wrapper", nil) + return nil +} + +func (nbmw *NonblockingMetricsWrapper) flushLoop() { + nbmw.logger.Debug("NonblockingMetricsWrapper: Starting metric reporter flush loop", nil) + ticker := time.NewTicker(nbmw.flushInterval) + defer ticker.Stop() + + for { + select { + case <-nbmw.ctx.Done(): + nbmw.logger.Debug("NonblockingMetricsWrapper: Canceling metric reporter flush loop", nil) + return + case <-ticker.C: + nbmw.flush() + } + } +} + +func (nbmw *NonblockingMetricsWrapper) flush() { + nbmw.logger.Trace("NonblockingMetricsWrapper: Flushing metricsImpl", nil) + nbmw.metricIndex.Range( + func(fingerprint string, nonblockingMetric *nonblockingMetric) bool { + // The underlying metric implementation might not have been initialized yet + if nonblockingMetric.metricImpl == nil { + // Get the parent metric vector + nonblockingMetricVec := nonblockingMetric.nonblockingMetricVec + // The underlying metric vector implementation might not have been initialized yet + if nonblockingMetricVec.metricVecImpl == nil { + var err error + nonblockingMetricVec.metricVecImpl, err = nbmw.metricsImpl.NewMetricVec(nonblockingMetricVec.name, nonblockingMetricVec.help, nonblockingMetricVec.labelNames...) + if err != nil { + // TODO: This will potentially log the same error in every flush loop iteration, consider deduplicating. + nbmw.logger.Error("NonblockingMetricsWrapper: Could not create metric vector", commontypes.LogFields{"err": err.Error()}) + return true + } + } + var err error + nonblockingMetric.metricImpl, err = nonblockingMetricVec.metricVecImpl.GetMetricWith(nonblockingMetric.labels) + if err != nil { + // TODO: This will potentially log the same error in every flush loop iteration, consider deduplicating. + nbmw.logger.Error("NonblockingMetricsWrapper: Could not create metric with labels", commontypes.LogFields{"err": err.Error()}) + return true + } + } + + err := nonblockingMetric.Flush() + if err != nil { + // TODO: This will potentially log the same error in every flush loop iteration, consider deduplicating. + nbmw.logger.Error("NonblockingMetricsWrapper: Could not set metric value", commontypes.LogFields{"err": err.Error()}) + return true + } + + return true + }) +} + +type nonblockingMetricVec struct { + nonblockingMetricsWrapper *NonblockingMetricsWrapper + name string + help string + labelNames []string + metricVecImpl commontypes.MetricVec +} + +// GetMetricWith returns a commontypes.Metric without blocking, even if the invocation of GetMetricWith of the +// underlying metricsImpl is blocking. GetMetricWith for the underlying metrics implementation is only invoked in the +// flush loop of the corresponding NonblockingMetricsWrapper the first time a value for the returned metric is +// encountered. +// An error is returned if the number and names of the labels are +// inconsistent with those of the variable labels of the MetricVec +// or if some labels' value is empty +// or if some labels' value is not a valid UTF-8 string. +// Label names are not validated since they are already validated when the MetricVec is constructed. +func (nbv *nonblockingMetricVec) GetMetricWith(labels map[string]string) (commontypes.Metric, error) { + if err := nbv.validLabels(labels); err != nil { + return nil, err + } + + fingerprint := metricFingerprint(nbv.name, labels) + + nbm := &nonblockingMetric{ + 0, + sync.Mutex{}, + nbv, + labels, + nil, // initialized in flush loop + } + + nbm, _ = nbv.nonblockingMetricsWrapper.metricIndex.LoadOrStore(fingerprint, nbm) + + return nbm, nil +} + +func (nbv *nonblockingMetricVec) validLabels(labels map[string]string) error { + if len(labels) != len(nbv.labelNames) { + return fmt.Errorf( + "expected %d label values but got %d in %#v", + len(nbv.labelNames), + len(labels), labels, + ) + } + + for name, val := range labels { + if len(val) == 0 { + return fmt.Errorf("label %q: label value is empty", name) + } + if !utf8.ValidString(val) { + return fmt.Errorf("label %q: value %q is not a valid UTF-8 sting", name, val) + } + } + + for _, label := range nbv.labelNames { + if _, ok := labels[label]; !ok { + return fmt.Errorf("label name %q missing in label map", label) + } + } + return nil +} + +type nonblockingMetric struct { + value float64 + valueLock sync.Mutex + + nonblockingMetricVec *nonblockingMetricVec + labels map[string]string + metricImpl commontypes.Metric +} + +func (nbm *nonblockingMetric) Set(f float64) { + nbm.valueLock.Lock() + defer nbm.valueLock.Unlock() + nbm.value = f +} + +func (nbm *nonblockingMetric) Inc() { + nbm.Add(1) +} + +func (nbm *nonblockingMetric) Dec() { + nbm.Add(-1) +} + +func (nbm *nonblockingMetric) Add(f float64) { + nbm.valueLock.Lock() + defer nbm.valueLock.Unlock() + nbm.value = nbm.value + f +} + +func (nbm *nonblockingMetric) Sub(f float64) { + nbm.Add(f * -1) +} + +func (nbm *nonblockingMetric) Flush() error { + if nbm.metricImpl == nil { + return fmt.Errorf("metric implementation is not initialized yet") + } + nbm.valueLock.Lock() + value := nbm.value + nbm.valueLock.Unlock() + nbm.metricImpl.Set(value) + return nil +} diff --git a/internal/metrics/util.go b/internal/metrics/util.go new file mode 100644 index 00000000..190b3156 --- /dev/null +++ b/internal/metrics/util.go @@ -0,0 +1,99 @@ +package metrics + +import ( + "bytes" + "fmt" + "sort" + "sync" +) + +// separatorByte is a byte that cannot occur in valid UTF-8 sequences +const separatorByte byte = 255 + +// Making metricIndexMap thread safe, since it might be accessed concurrently +// Using sync.Map instead of a map with a mutex to avoid lock contention +// Importantly, iterating over a sync.Map with Range does not block any method on the receiver +type metricIndexMap struct { + mm sync.Map +} + +func (m *metricIndexMap) Load(key string) (*nonblockingMetric, bool) { + if v, ok := m.mm.Load(key); ok { + if v == nil { + return nil, ok + } + return v.(*nonblockingMetric), ok + } + return nil, false +} + +func (m *metricIndexMap) Store(key string, value *nonblockingMetric) { + m.mm.Store(key, value) +} + +func (m *metricIndexMap) Delete(key string) { + m.mm.Delete(key) +} + +func (m *metricIndexMap) LoadAndDelete(key string) (*nonblockingMetric, bool) { + if v, ok := m.mm.LoadAndDelete(key); ok { + if v == nil { + return nil, ok + } + return v.(*nonblockingMetric), ok + } + return nil, false +} + +func (m *metricIndexMap) LoadOrStore(key string, value *nonblockingMetric) (*nonblockingMetric, bool) { + v, ok := m.mm.LoadOrStore(key, value) + if v == nil { + return nil, ok + } + return v.(*nonblockingMetric), ok +} + +func (m *metricIndexMap) Range(f func(key string, value *nonblockingMetric) bool) { + m.mm.Range(func(k, v any) bool { + if v == nil { + return f(k.(string), nil) + } + return f(k.(string), v.(*nonblockingMetric)) + }) +} + +// metricFingerprint returns a unique signature for a given name and label set +func metricFingerprint(name string, labels map[string]string) string { + var b bytes.Buffer + b.WriteString(name) + b.WriteByte(separatorByte) + + labelNames := make([]string, 0, len(labels)) + for labelName := range labels { + labelNames = append(labelNames, labelName) + } + sort.Strings(labelNames) + + for _, labelName := range labelNames { + b.WriteString(labelName) + b.WriteByte(separatorByte) + b.WriteString(labels[labelName]) + b.WriteByte(separatorByte) + } + return b.String() +} + +// validName checks that the name only contains ASCII letters and digits, as well as underscores and colons +func validName(name string) error { + if len(name) < 1 { + return fmt.Errorf("empty string is not allowed") + } + + for _, c := range name { + if !('0' <= c && c <= '9') && !('a' <= c && c <= 'z') && !('A' <= c && c <= 'Z') && c != ':' && c != '_' { + return fmt.Errorf("the string contains an invalid character; it should only contain only ASCII letters and digits, underscores and colons") + } + } + + return nil +} diff --git a/networking/peer.go b/networking/peer.go index bc8a5998..dfc2a104 100644 --- a/networking/peer.go +++ b/networking/peer.go @@ -61,6 +61,8 @@ type PeerConfig struct { V1EndpointConfig EndpointConfigV1 V2EndpointConfig EndpointConfigV2 + + Metrics commontypes.Metrics } // concretePeer represents a libp2p and/or ragep2p peer diff --git a/networking/peer_v2.go b/networking/peer_v2.go index 6bfe5fd2..bce204f6 100644 --- a/networking/peer_v2.go +++ b/networking/peer_v2.go @@ -4,10 +4,12 @@ import ( "crypto/ed25519" "fmt" "sync" + "time" "github.com/pkg/errors" "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/internal/loghelper" + "github.com/smartcontractkit/libocr/internal/metrics" "github.com/smartcontractkit/libocr/networking/ragedisco" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/smartcontractkit/libocr/ragep2p" @@ -15,16 +17,20 @@ import ( "go.uber.org/multierr" ) +const MetricFlushInterval = 2 * time.Second + // concretePeerV2 represents a ragep2p peer with one peer ID listening on one port type concretePeerV2 struct { peerID ragetypes.PeerID host *ragep2p.Host discoverer *ragedisco.Ragep2pDiscoverer + metricsWrapper *metrics.NonblockingMetricsWrapper logger loghelper.LoggerWithContext endpointConfig EndpointConfigV2 } func newPeerV2(c PeerConfig) (*concretePeerV2, error) { + success := false rawPriv, err := c.PrivKey.Raw() if err != nil { @@ -49,13 +55,23 @@ func newPeerV2(c PeerConfig) (*concretePeerV2, error) { if len(c.V2AnnounceAddresses) == 0 { announceAddresses = c.V2ListenAddresses } - discoverer := ragedisco.NewRagep2pDiscoverer(c.V2DeltaReconcile, announceAddresses, c.V2DiscovererDatabase) + + nonblockingMetricsWrapper := metrics.NewNonblockingMetricsWrapper( + logger, + c.Metrics, + MetricFlushInterval, + ) + defer loghelper.CloseLogErrorUnlessSuccess(&success, nonblockingMetricsWrapper, logger, "failed to close metrics wrapper in failed newPeerV2") + + discoverer := ragedisco.NewRagep2pDiscoverer(c.V2DeltaReconcile, announceAddresses, c.V2DiscovererDatabase, nonblockingMetricsWrapper) + host, err := ragep2p.NewHost( ragep2p.HostConfig{c.V2DeltaDial}, ed25519Priv, c.V2ListenAddresses, discoverer, c.Logger, + nonblockingMetricsWrapper, ) if err != nil { return nil, fmt.Errorf("failed to construct ragep2p host: %w", err) @@ -64,13 +80,16 @@ func newPeerV2(c PeerConfig) (*concretePeerV2, error) { if err != nil { return nil, fmt.Errorf("failed to start ragep2p host: %w", err) } + defer loghelper.CloseLogErrorUnlessSuccess(&success, host, logger, "failed to close the host in failed newPeerV2") logger.Info("PeerV2: ragep2p host booted", nil) + success = true return &concretePeerV2{ peerID, host, discoverer, + nonblockingMetricsWrapper, logger, c.V2EndpointConfig, }, nil @@ -115,7 +134,9 @@ func (p2 *concretePeerV2) PeerID() string { } func (p2 *concretePeerV2) Close() error { - return p2.host.Close() + return multierr.Append( + p2.host.Close(), + p2.metricsWrapper.Close()) } func decodev2Bootstrappers(v2bootstrappers []commontypes.BootstrapperLocator) (infos []ragetypes.PeerInfo, err error) { for _, b := range v2bootstrappers { diff --git a/networking/ragedisco/discovery_protocol.go b/networking/ragedisco/discovery_protocol.go index 2f9c5184..6c374eb3 100644 --- a/networking/ragedisco/discovery_protocol.go +++ b/networking/ragedisco/discovery_protocol.go @@ -67,10 +67,46 @@ type discoveryProtocol struct { db nettypes.DiscovererDatabase - processes subprocesses.Subprocesses - ctx context.Context - ctxCancel context.CancelFunc - logger loghelper.LoggerWithContext + processes subprocesses.Subprocesses + ctx context.Context + ctxCancel context.CancelFunc + logger loghelper.LoggerWithContext + protocolMetrics discoveryProtocolMetrics +} + +type discoveryProtocolMetrics struct { + peersCount commontypes.Metric + peersUndiscovered commontypes.Metric + peersDiscovered commontypes.Metric +} + +func makeDiscoveryProtocolMetrics(metrics commontypes.Metrics, peerID string) (discoveryProtocolMetrics, error) { + dpm := discoveryProtocolMetrics{} + peersCountVec, err := metrics.NewMetricVec("ragedisco_peers_count", "The total number of peers in network discovery", "peerID") + if err != nil { + return dpm, err + } + dpm.peersCount, err = peersCountVec.GetMetricWith(map[string]string{"peerID": peerID}) + if err != nil { + return dpm, err + } + peersUndiscoveredVec, err := metrics.NewMetricVec("ragedisco_peers_undiscovered", "The number of undiscovered peers in network discovery", "peerID") + if err != nil { + return dpm, err + } + dpm.peersUndiscovered, err = peersUndiscoveredVec.GetMetricWith(map[string]string{"peerID": peerID}) + if err != nil { + return dpm, err + } + peersDiscoveredVec, err := metrics.NewMetricVec("ragedisco_peers_discovered", "The number of discovered peers in network discovery", "peerID") + if err != nil { + return dpm, err + } + dpm.peersDiscovered, err = peersDiscoveredVec.GetMetricWith(map[string]string{"peerID": peerID}) + if err != nil { + return dpm, err + } + return dpm, nil } const ( @@ -90,12 +126,16 @@ func newDiscoveryProtocol( ownAddrs []ragetypes.Address, db nettypes.DiscovererDatabase, logger loghelper.LoggerWithContext, + metrics commontypes.Metrics, ) (*discoveryProtocol, error) { ownID, err := ragetypes.PeerIDFromPrivateKey(privKey) if err != nil { return nil, fmt.Errorf("failed to obtain peer id from private key: %w", err) } - + protocolMetrics, err := makeDiscoveryProtocolMetrics(metrics, ownID.String()) + if err != nil { + return nil, fmt.Errorf("failed to initialize discovery protocol metrics: %w", err) + } ctx, ctxCancel := context.WithCancel(context.Background()) return &discoveryProtocol{ sync.Mutex{}, @@ -121,6 +161,7 @@ func newDiscoveryProtocol( ctx, ctxCancel, logger.MakeChild(commontypes.LogFields{"id": "discoveryProtocol"}), + protocolMetrics, }, nil } @@ -182,21 +223,28 @@ func (p *discoveryProtocol) statusReportLoop() { func() { p.lock.RLock() defer p.lock.RUnlock() - uniquePeersToDetect := make(map[ragetypes.PeerID]struct{}) + uniquePeersToDiscover := make(map[ragetypes.PeerID]struct{}) for id, cnt := range p.locked.numGroupsByOracle { if cnt == 0 { continue } - uniquePeersToDetect[id] = struct{}{} + uniquePeersToDiscover[id] = struct{}{} } - reportStr, undetected := formatAnnouncementsForReport(uniquePeersToDetect, p.locked.bestAnnouncement) + reportStr, peersUndiscovered := formatAnnouncementsForReport(uniquePeersToDiscover, p.locked.bestAnnouncement) + peersCount := len(uniquePeersToDiscover) + peersDiscovered := len(uniquePeersToDiscover) - peersUndiscovered p.logger.Info("DiscoveryProtocol: Status report", commontypes.LogFields{ - "statusByPeer": reportStr, - "peersToDetect": len(uniquePeersToDetect), - "peersUndetected": undetected, - "peersDetected": len(uniquePeersToDetect) - undetected, + "statusByPeer": reportStr, + "peersCount": peersCount, + "peersUndiscovered": peersUndiscovered, + "peersDiscovered": peersDiscovered, }) + + p.protocolMetrics.peersCount.Set(float64(peersCount)) + p.protocolMetrics.peersUndiscovered.Set(float64(peersUndiscovered)) + p.protocolMetrics.peersDiscovered.Set(float64(peersDiscovered)) + timer = time.After(reportInterval) }() case <-chDone: diff --git a/networking/ragedisco/ragep2p_discoverer.go b/networking/ragedisco/ragep2p_discoverer.go index 89089f81..ce9f6878 100644 --- a/networking/ragedisco/ragep2p_discoverer.go +++ b/networking/ragedisco/ragep2p_discoverer.go @@ -46,12 +46,15 @@ type Ragep2pDiscoverer struct { chIncomingMessages chan incomingMessage chOutgoingMessages chan outgoingMessage chConnectivity chan connectivityMsg + + metrics commontypes.Metrics } func NewRagep2pDiscoverer( deltaReconcile time.Duration, announceAddresses []string, db nettypes.DiscovererDatabase, + metrics commontypes.Metrics, ) *Ragep2pDiscoverer { ctx, ctxCancel := context.WithCancel(context.Background()) return &Ragep2pDiscoverer{ @@ -71,10 +74,11 @@ func NewRagep2pDiscoverer( make(chan incomingMessage), make(chan outgoingMessage), make(chan connectivityMsg), + metrics, } } -func (r *Ragep2pDiscoverer) Start(h *ragep2p.Host, privKey ed25519.PrivateKey, logger loghelper.LoggerWithContext) error { +func (r *Ragep2pDiscoverer) Start(host *ragep2p.Host, privKey ed25519.PrivateKey, logger loghelper.LoggerWithContext) error { succeeded := false defer func() { if !succeeded { @@ -89,7 +93,7 @@ func (r *Ragep2pDiscoverer) Start(h *ragep2p.Host, privKey ed25519.PrivateKey, l return fmt.Errorf("cannot start Ragep2pDiscoverer that is not unstarted, state was: %v", r.state) } r.state = ragep2pDiscovererStarted - r.host = h + r.host = host announceAddresses, ok := combinedAnnounceAddrsForDiscoverer(r.logger, r.announceAddresses) if !ok { return fmt.Errorf("failed to obtain announce addresses") @@ -103,6 +107,7 @@ func (r *Ragep2pDiscoverer) Start(h *ragep2p.Host, privKey ed25519.PrivateKey, l announceAddresses, r.db, logger, + r.metrics, ) if err != nil { return fmt.Errorf("failed to construct underlying discovery protocol: %w", err) diff --git a/offchainreporting/internal/protocol/oracle.go b/offchainreporting/internal/protocol/oracle.go index 5c746f99..5fb282c8 100644 --- a/offchainreporting/internal/protocol/oracle.go +++ b/offchainreporting/internal/protocol/oracle.go @@ -81,17 +81,17 @@ type oracleState struct { // Here is a graph of the various channels involved and what they // transport. // -// ┌─────────────epoch changes─────────────┐ -// ▼ │ -// ┌──────┐ ┌────┴────┐ -// │Oracle├────pacemaker messages────────►│Pacemaker│ -// └────┬─┘ └─────────┘ -// │ ▲ -// └──────rep. gen. messages────────────┐ │ -// ▼ │progress events -// ┌────────────┐ ┌─────┴──────────┐ -// │Transmission│◄──────reports───────────┤ReportGeneration│ -// └────────────┘ └────────────────┘ +// ┌─────────────epoch changes─────────────┐ +// ▼ │ +// ┌──────┐ ┌────┴────┐ +// │Oracle├────pacemaker messages────────►│Pacemaker│ +// └────┬─┘ └─────────┘ +// │ ▲ +// └──────rep. gen. messages────────────┐ │ +// ▼ │progress events +// ┌────────────┐ ┌─────┴──────────┐ +// │Transmission│◄──────reports───────────┤ReportGeneration│ +// └────────────┘ └────────────────┘ // // All channels are unbuffered. // diff --git a/offchainreporting/oracle.go b/offchainreporting/oracle.go index fdd25f58..e9db1c7d 100644 --- a/offchainreporting/oracle.go +++ b/offchainreporting/oracle.go @@ -52,6 +52,9 @@ type OracleArgs struct { // Logger logs stuff Logger commontypes.Logger + // Enables adding metrics to track. This may be nil. + Metrics commontypes.Metrics + // Used to send logs to a monitor. This may be nil. MonitoringEndpoint commontypes.MonitoringEndpoint diff --git a/offchainreporting2/internal/protocol/oracle.go b/offchainreporting2/internal/protocol/oracle.go index 74c7f73e..80762b36 100644 --- a/offchainreporting2/internal/protocol/oracle.go +++ b/offchainreporting2/internal/protocol/oracle.go @@ -85,26 +85,24 @@ type oracleState struct { // Here is a graph of the various channels involved and what they // transport. // -// -// ┌────────────epoch changes──────────────┐ -// ▼ │ -// ┌──────┐ ┌────┴────┐ -// │Oracle├─────pacemaker messages───────►│Pacemaker│ -// └──┬─┬─┘ └─────────┘ -// │ │ ▲ -// │ └───────rep. gen. messages───────────┐ │ -// │rep. fin. messages │ │ -// ▼ ▼ │progress events -// ┌──────────────────┐ ┌─────┴──────────┐ -// │ReportFinalization│◄───final events───┤ReportGeneration│ -// └────────┬─────────┘ └────────────────┘ -// │ -// │transmit events -// ▼ -// ┌────────────┐ -// │Transmission│ -// └────────────┘ -// +// ┌────────────epoch changes──────────────┐ +// ▼ │ +// ┌──────┐ ┌────┴────┐ +// │Oracle├─────pacemaker messages───────►│Pacemaker│ +// └──┬─┬─┘ └─────────┘ +// │ │ ▲ +// │ └───────rep. gen. messages───────────┐ │ +// │rep. fin. messages │ │ +// ▼ ▼ │progress events +// ┌──────────────────┐ ┌─────┴──────────┐ +// │ReportFinalization│◄───final events───┤ReportGeneration│ +// └────────┬─────────┘ └────────────────┘ +// │ +// │transmit events +// ▼ +// ┌────────────┐ +// │Transmission│ +// └────────────┘ // // All channels are unbuffered. // diff --git a/offchainreporting2/oracle.go b/offchainreporting2/oracle.go index dd780317..92f76051 100644 --- a/offchainreporting2/oracle.go +++ b/offchainreporting2/oracle.go @@ -39,6 +39,9 @@ type OracleArgs struct { // Logger logs stuff. Logger commontypes.Logger + // Enables adding metrics to track. This may be nil. + Metrics commontypes.Metrics + // Used to send logs to a monitor. MonitoringEndpoint commontypes.MonitoringEndpoint diff --git a/ragep2p/doc.go b/ragep2p/doc.go index b02149dd..4ce27f95 100644 --- a/ragep2p/doc.go +++ b/ragep2p/doc.go @@ -2,7 +2,7 @@ // self-healing, message-oriented, authenticated, encrypted bidirectional // communication streams between peers. // -// Concepts +// # Concepts // // ragep2p peers are identified by their PeerID. PeerIDs are public keys. // PeerIDs' string representation is compatible with libp2p to ease migration @@ -24,7 +24,7 @@ // guarantee that messages that are delivered are delivered in FIFO order and // without modifications. // -// Peer discovery +// # Peer discovery // // ragep2p will handle peer discovery (i.e. associating network addresses like // 1.2.3.4:1337 with PeerIDs) automatically. Upon construction of a Host, a @@ -34,21 +34,21 @@ // sequentially dialing all of them until a connection is successfully // established. // -// Thread Safety +// # Thread Safety // // All public functions on Host and Stream are thread-safe. // // It is safe to double-Close(), though all but the first Close() may return an // error. // -// Allocations +// # Allocations // // We allocate a buffer for each message received. In principle, this could allo // an adversary to force a recipient to run out of memory. To defend against // this, we put limits on the length of messages and rate limit messages, // thereby also limiting adversarially-controlled allocations. // -// Security +// # Security // // Note: Users don't need to care about the details of how these security // measures work, only what properties they provide. @@ -85,5 +85,4 @@ // Stream.Close(), Stream.SendMessage(), and Stream.Receive() return immediately // and do any potential resulting communication asynchronously in the // background. Host.Close() terminates after at most a few seconds. -// package ragep2p diff --git a/ragep2p/ragep2p.go b/ragep2p/ragep2p.go index 7517853c..457542ae 100644 --- a/ragep2p/ragep2p.go +++ b/ragep2p/ragep2p.go @@ -148,6 +148,7 @@ type Host struct { listenAddresses []string discoverer Discoverer logger loghelper.LoggerWithContext + metrics commontypes.Metrics // Derived from secretKey id types.PeerID @@ -176,6 +177,7 @@ func NewHost( listenAddresses []string, discoverer Discoverer, logger commontypes.Logger, + metrics commontypes.Metrics, ) (*Host, error) { if len(listenAddresses) == 0 { return nil, fmt.Errorf("no listen addresses provided") @@ -194,6 +196,7 @@ func NewHost( discoverer, // peerID might already be set to the same value if we are managed, but we don't take any chances loghelper.MakeRootLoggerWithContext(logger).MakeChild(commontypes.LogFields{"id": "ragep2p", "peerID": types.PeerID(id)}), + metrics, id, mtls.NewMinimalX509CertFromPrivateKey(secretKey),