Skip to content

xds: client outlier detection metrics [A91] #8437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
Expand All @@ -52,6 +53,26 @@
// Name is the name of the outlier detection balancer.
const Name = "outlier_detection_experimental"

var (
ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.outlier_detection.ejections_enforced",
Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method",
Unit: "ejection",
Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"},
OptionalLabels: []string{"grpc.lb.backend_service"},
Default: false,
})

ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.outlier_detection.ejections_unenforced",
Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage",
Unit: "ejection",
Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"},
OptionalLabels: []string{"grpc.lb.backend_service"},
Default: false,
})
)

func init() {
balancer.Register(bb{})
}
Expand All @@ -60,14 +81,16 @@

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &outlierDetectionBalancer{
ClientConn: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*endpointInfo),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParent: bOpts.ChannelzParent,
endpoints: resolver.NewEndpointMap[*endpointInfo](),
ClientConn: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*endpointInfo),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParent: bOpts.ChannelzParent,
endpoints: resolver.NewEndpointMap[*endpointInfo](),
metricsRecorder: cc.MetricsRecorder(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: since we have cc captured, maybe we don't also want to save the MetricsRecorder in another field? Less state seems better.

target: bOpts.Target.String(),
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
Expand Down Expand Up @@ -134,6 +157,15 @@
return Name
}

// extractBackendService extracts the backend service from resolver state attributes.
// This is a placeholder implementation - the actual extraction logic should be
// implemented based on the specific resolver attributes available.
func extractBackendService(resolver.State) string {
// TODO: Implement backend service extraction from resolver attributes per A89 and A75
// For now, return empty string as this is optional
return ""
}

Comment on lines +160 to +168
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove this? If it's not clear this needs to be done as part of A75/A89 implementation work, then you could file a bug so that we don't forget about it. Thanks!

// scUpdate wraps a subConn update to be sent to the child balancer.
type scUpdate struct {
scw *subConnWrapper
Expand Down Expand Up @@ -169,10 +201,13 @@
// to suppress redundant picker updates.
recentPickerNoop bool

closed *grpcsync.Event
done *grpcsync.Event
logger *grpclog.PrefixLogger
channelzParent channelz.Identifier
closed *grpcsync.Event
done *grpcsync.Event
logger *grpclog.PrefixLogger
channelzParent channelz.Identifier
metricsRecorder estats.MetricsRecorder
target string
backendService string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be removed along with the above.


child synchronizingBalancerWrapper

Expand Down Expand Up @@ -294,6 +329,7 @@
b.inhibitPickerUpdates = true
b.updateUnconditionally = false
b.cfg = lbCfg
b.backendService = extractBackendService(s.ResolverState)

newEndpoints := resolver.NewEndpointMap[bool]()
for _, ep := range s.ResolverState.Endpoints {
Expand Down Expand Up @@ -791,15 +827,21 @@
for _, epInfo := range endpointsToConsider {
bucket := epInfo.callCounter.inactiveBucket
ejectionCfg := b.cfg.SuccessRateEjection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
return
}
successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)
if successRate < requiredSuccessRate {
channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate)
// Check if max ejection percentage would prevent ejection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
// Record unenforced ejection due to max ejection percentage
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow", b.backendService)
continue

Check warning on line 838 in xds/internal/balancer/outlierdetection/balancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/outlierdetection/balancer.go#L836-L838

Added lines #L836 - L838 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add tests that include the cases that are currently missed? I.e. ejection did not occur for various reasons.

}
if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage {
b.ejectEndpoint(epInfo)
b.ejectEndpoint(epInfo, "success_rate")
} else {
// Record unenforced ejection due to enforcement percentage
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage", b.backendService)

Check warning on line 844 in xds/internal/balancer/outlierdetection/balancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/outlierdetection/balancer.go#L843-L844

Added lines #L843 - L844 were not covered by tests
}
}
}
Expand All @@ -819,21 +861,27 @@
for _, epInfo := range endpointsToConsider {
bucket := epInfo.callCounter.inactiveBucket
ejectionCfg := b.cfg.FailurePercentageEjection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
return
}
failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage)
// Check if max ejection percentage would prevent ejection
if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) {
// Record unenforced ejection due to max ejection percentage
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow", b.backendService)
continue

Check warning on line 871 in xds/internal/balancer/outlierdetection/balancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/outlierdetection/balancer.go#L869-L871

Added lines #L869 - L871 were not covered by tests
}
if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage {
b.ejectEndpoint(epInfo)
b.ejectEndpoint(epInfo, "failure_percentage")
} else {
// Record unenforced ejection due to enforcement percentage
ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage", b.backendService)

Check warning on line 877 in xds/internal/balancer/outlierdetection/balancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/outlierdetection/balancer.go#L876-L877

Added lines #L876 - L877 were not covered by tests
}
}
}
}

// Caller must hold b.mu.
func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) {
func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detectionMethod string) {
b.numEndpointsEjected++
epInfo.latestEjectionTimestamp = b.timerStartTime
epInfo.ejectionTimeMultiplier++
Expand All @@ -842,6 +890,8 @@
channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw)
}

// Record the enforced ejection metric
ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod, b.backendService)
}

// Caller must hold b.mu.
Expand Down
30 changes: 30 additions & 0 deletions xds/internal/balancer/outlierdetection/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -1159,6 +1160,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
}
}

// Create test metrics recorder
tmr := stats.NewTestMetricsRecorder()
od.metricsRecorder = tmr
od.intervalTimerAlgorithm()

// verify no StateListener() call on the child, as no addresses got
Expand All @@ -1168,6 +1172,12 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
if _, err := scsCh.Receive(sCtx); err == nil {
t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got)
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got)
}

// Since no addresses are ejected, a SubConn update should forward down
// to the child.
Expand Down Expand Up @@ -1234,6 +1244,12 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
if _, err := scsCh.Receive(sCtx); err == nil {
t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got)
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got)
}

// Now that an address is ejected, SubConn updates for SubConns using
// that address should not be forwarded downward. These SubConn updates
Expand Down Expand Up @@ -1414,13 +1430,21 @@ func (s) TestEjectFailureRate(t *testing.T) {
pi.Done(balancer.DoneInfo{})
}
}
tmr := stats.NewTestMetricsRecorder()
od.metricsRecorder = tmr

od.intervalTimerAlgorithm()
sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if _, err := scsCh.Receive(sCtx); err == nil {
t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got)
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got)
}

// Set two upstream addresses to have five successes each, and one
// upstream address to have five failures. This should cause the address
Expand Down Expand Up @@ -1465,6 +1489,12 @@ func (s) TestEjectFailureRate(t *testing.T) {
if _, err := scsCh.Receive(sCtx); err == nil {
t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got)
}
if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 {
t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got)
}

// upon the Outlier Detection balancer being reconfigured with a noop
// configuration, every ejected SubConn should be unejected.
Expand Down
Loading