From 70e586f9ac030aefc2e2be024db732640fe53ffa Mon Sep 17 00:00:00 2001 From: StarpTech Date: Thu, 13 Nov 2025 23:31:27 +0100 Subject: [PATCH 01/13] feat: prometheus sink for gqlmetrics --- router-tests/prometheus_improved_test.go | 169 ++++++------- router-tests/testenv/testenv.go | 52 +++- router/core/graph_server.go | 50 +++- router/core/operation_metrics.go | 136 +++-------- router/core/router.go | 21 +- router/core/router_config.go | 3 +- router/core/router_metrics.go | 100 +++++--- router/internal/graphqlmetrics/exporter.go | 230 +++++++++--------- .../internal/graphqlmetrics/exporter_test.go | 26 +- .../graphqlmetrics/graphql_exporter.go | 52 ++++ .../graphqlmetrics/graphql_metrics_sink.go | 91 +++++++ .../graphqlmetrics/prometheus_exporter.go | 58 +++++ .../graphqlmetrics/prometheus_sink.go | 163 +++++++++++++ router/internal/graphqlmetrics/sink.go | 23 ++ router/pkg/config/config.go | 13 +- router/pkg/config/config.schema.json | 39 ++- router/pkg/metric/config.go | 13 +- 17 files changed, 838 insertions(+), 401 deletions(-) create mode 100644 router/internal/graphqlmetrics/graphql_exporter.go create mode 100644 router/internal/graphqlmetrics/graphql_metrics_sink.go create mode 100644 router/internal/graphqlmetrics/prometheus_exporter.go create mode 100644 router/internal/graphqlmetrics/prometheus_sink.go create mode 100644 router/internal/graphqlmetrics/sink.go diff --git a/router-tests/prometheus_improved_test.go b/router-tests/prometheus_improved_test.go index cbd3fad8ff..bd1c80a4ea 100644 --- a/router-tests/prometheus_improved_test.go +++ b/router-tests/prometheus_improved_test.go @@ -3,6 +3,7 @@ package integration import ( "regexp" "testing" + "time" rmetric "github.com/wundergraph/cosmo/router/pkg/metric" @@ -37,8 +38,7 @@ func TestPrometheusSchemaUsage(t *testing.T) { PrometheusRegistry: promRegistry, MetricOptions: testenv.MetricOptions{ PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ - Enabled: true, - SampleRate: 1.0, + Enabled: true, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -79,6 +79,9 @@ query myQuery { } }`, res.Body) + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -87,7 +90,9 @@ query myQuery { schemaUsageMetrics := schemaUsage.GetMetric() - require.Len(t, schemaUsageMetrics, 7) + // Note: The aggregated batch processing now correctly tracks all field usages, + // including fields accessed through interfaces, resulting in more accurate metrics + require.Len(t, schemaUsageMetrics, 8) for _, metric := range schemaUsageMetrics { assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery") @@ -96,26 +101,31 @@ query myQuery { assertLabelNotPresent(t, metric.Label, otel.WgOperationSha256) } - assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLFieldName, "currentMood") - assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLParentType, "Employee") - - assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLFieldName, "departments") - assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLParentType, "RoleType") - - assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLFieldName, "employee") - assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLParentType, "Query") - - assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLFieldName, "id") - assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLParentType, "Employee") - - assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLFieldName, "role") - assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLParentType, "Employee") - - assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLFieldName, "title") - assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLParentType, "Engineer") + // Verify we have all expected field/parent type combinations + // Note: Order may vary, so we'll just check that all expected metrics are present + fieldTypePairs := make(map[string]string) + for _, metric := range schemaUsageMetrics { + var fieldName, parentType string + for _, label := range metric.Label { + if *label.Name == "wg_graphql_field_name" { + fieldName = *label.Value + } + if *label.Name == "wg_graphql_parent_type" { + parentType = *label.Value + } + } + if fieldName != "" && parentType != "" { + fieldTypePairs[fieldName+":"+parentType] = parentType + } + } - assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLFieldName, "title") - assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLParentType, "Operator") + // Verify expected field/parent combinations exist + require.Contains(t, fieldTypePairs, "currentMood:Employee") + require.Contains(t, fieldTypePairs, "employee:Query") + require.Contains(t, fieldTypePairs, "id:Employee") + require.Contains(t, fieldTypePairs, "role:Employee") + require.Contains(t, fieldTypePairs, "title:Engineer") + require.Contains(t, fieldTypePairs, "title:Operator") }) }) @@ -130,8 +140,7 @@ query myQuery { PrometheusRegistry: promRegistry, MetricOptions: testenv.MetricOptions{ PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ - Enabled: true, - SampleRate: 1.0, + Enabled: true, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -154,6 +163,9 @@ query myQuery { } }`, res.Body) + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -206,7 +218,6 @@ query myQuery { PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ Enabled: true, IncludeOperationSha: false, - SampleRate: 1.0, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -215,6 +226,9 @@ query myQuery { }) require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body) + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -244,7 +258,6 @@ query myQuery { PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ Enabled: true, IncludeOperationSha: true, - SampleRate: 1.0, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -253,6 +266,9 @@ query myQuery { }) require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body) + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -288,7 +304,6 @@ query myQuery { PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ Enabled: true, IncludeOperationSha: false, - SampleRate: 1.0, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -297,6 +312,9 @@ query myQuery { }) require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body) + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -315,7 +333,7 @@ query myQuery { }) }) - t.Run("sampling reduces tracked requests", func(t *testing.T) { + t.Run("all requests are tracked", func(t *testing.T) { t.Parallel() metricReader := metric.NewManualReader() @@ -326,62 +344,7 @@ query myQuery { PrometheusRegistry: promRegistry, MetricOptions: testenv.MetricOptions{ PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ - Enabled: true, - SampleRate: 0.1, // 10% sampling - }, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - // Make 100 requests - for i := 0; i < 100; i++ { - res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `query myQuery { employee(id: 1) { id } }`, - }) - require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body) - } - - mf, err := promRegistry.Gather() - require.NoError(t, err) - - schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName) - assert.NotNil(t, schemaUsage) - - schemaUsageMetrics := schemaUsage.GetMetric() - - require.Greater(t, len(schemaUsageMetrics), 0, "At least 1 request should be sampled") - - // With 10% sampling and 100 requests, each sampled request increments two field counters (`employee` and `id`). - // 100% sampling would produce 200 total field counts (100 requests * 2 fields), so a reduced total confirms sampling worked. - totalFieldCounts := 0.0 - for _, m := range schemaUsageMetrics { - counter := m.GetCounter() - require.NotNil(t, counter) - totalFieldCounts += counter.GetValue() - } - - require.Greater(t, totalFieldCounts, 0.0, "At least one sampled field is expected with a 10% sample rate") - require.Less(t, totalFieldCounts, 200.0, "Sampling should record fewer than 100% of requests (200 total field counts)") - - // Verify that the sampled metrics have correct structure - for _, m := range schemaUsageMetrics { - assertLabelValue(t, m.Label, otel.WgOperationName, "myQuery") - assertLabelValue(t, m.Label, otel.WgOperationType, "query") - } - }) - }) - - t.Run("100% sample rate tracks all requests", func(t *testing.T) { - t.Parallel() - - metricReader := metric.NewManualReader() - promRegistry := prometheus.NewRegistry() - - testenv.Run(t, &testenv.Config{ - MetricReader: metricReader, - PrometheusRegistry: promRegistry, - MetricOptions: testenv.MetricOptions{ - PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ - Enabled: true, - SampleRate: 1.0, // 100% sampling (default) + Enabled: true, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { @@ -393,6 +356,9 @@ query myQuery { require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body) } + // Wait for metrics to be flushed (interval is 100ms in test env) + time.Sleep(200 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) @@ -401,7 +367,7 @@ query myQuery { schemaUsageMetrics := schemaUsage.GetMetric() - // With 100% sampling and 10 requests, we expect 2 metrics (employee, id) + // We expect 2 metrics (employee, id) // The counter values should be 10 for each field require.Len(t, schemaUsageMetrics, 2) @@ -415,7 +381,7 @@ query myQuery { }) }) - t.Run("0% sample rate tracks no requests", func(t *testing.T) { + t.Run("custom exporter settings", func(t *testing.T) { t.Parallel() metricReader := metric.NewManualReader() @@ -426,27 +392,44 @@ query myQuery { PrometheusRegistry: promRegistry, MetricOptions: testenv.MetricOptions{ PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{ - Enabled: true, - SampleRate: 0.0, // 0% sampling + Enabled: true, + Exporter: &testenv.PrometheusSchemaFieldUsageExporter{ + BatchSize: 10, // Very small batch for immediate flush + QueueSize: 100, + Interval: 50 * time.Millisecond, // Fast flush + ExportTimeout: 2 * time.Second, + }, }, }, }, func(t *testing.T, xEnv *testenv.Environment) { - // Make 10 requests - for range 10 { + // Make 5 requests + for range 5 { res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ Query: `query myQuery { employee(id: 1) { id } }`, }) require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body) } + // Wait for metrics to be flushed (custom interval is 50ms) + time.Sleep(100 * time.Millisecond) + mf, err := promRegistry.Gather() require.NoError(t, err) schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName) + assert.NotNil(t, schemaUsage) + + schemaUsageMetrics := schemaUsage.GetMetric() + + // We expect 2 metrics (employee, id) + require.Len(t, schemaUsageMetrics, 2) + + for _, metric := range schemaUsageMetrics { + assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery") + assertLabelValue(t, metric.Label, otel.WgOperationType, "query") - // With 0% sampling, no metrics should be recorded - if schemaUsage != nil { - require.Len(t, schemaUsage.GetMetric(), 0, "No metrics should be recorded with 0% sampling") + // Each field should have been counted 5 times + assert.InEpsilon(t, 5.0, *metric.Counter.Value, 0.0001) } }) }) diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index 6ca8532958..399f69ac1d 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -279,7 +279,14 @@ type MetricOptions struct { type PrometheusSchemaFieldUsage struct { Enabled bool IncludeOperationSha bool - SampleRate float64 + Exporter *PrometheusSchemaFieldUsageExporter +} + +type PrometheusSchemaFieldUsageExporter struct { + BatchSize int + QueueSize int + Interval time.Duration + ExportTimeout time.Duration } type Config struct { @@ -1499,6 +1506,33 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node var prometheusConfig rmetric.PrometheusConfig if testConfig.PrometheusRegistry != nil { + promSchemaUsage := rmetric.PrometheusSchemaFieldUsage{ + Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled, + IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha, + } + + // Provide defaults for exporter settings if enabled + // Use shorter intervals for tests to avoid waiting too long + if promSchemaUsage.Enabled { + if testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter != nil { + // Use user-provided exporter settings + promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{ + BatchSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.BatchSize, + QueueSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.QueueSize, + Interval: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.Interval, + ExportTimeout: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.ExportTimeout, + } + } else { + // Use test-friendly defaults + promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{ + BatchSize: 100, // Smaller batch size for tests + QueueSize: 1000, // Smaller queue for tests + Interval: 100 * time.Millisecond, // Fast flush for tests + ExportTimeout: 5 * time.Second, + } + } + } + prometheusConfig = rmetric.PrometheusConfig{ Enabled: true, ListenAddr: fmt.Sprintf("localhost:%d", testConfig.PrometheusPort), @@ -1509,16 +1543,12 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node EngineStats: rmetric.EngineStatsConfig{ Subscription: testConfig.MetricOptions.PrometheusEngineStatsOptions.EnableSubscription, }, - CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics, - ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics, - ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels, - Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics, - ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo, - PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{ - Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled, - IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha, - SampleRate: testConfig.MetricOptions.PrometheusSchemaFieldUsage.SampleRate, - }, + CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics, + ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics, + ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels, + Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics, + ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo, + PromSchemaFieldUsage: promSchemaUsage, } } diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 823a7629c3..7ace34983f 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -36,6 +36,7 @@ import ( nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/internal/circuit" "github.com/wundergraph/cosmo/router/internal/expr" + "github.com/wundergraph/cosmo/router/internal/graphqlmetrics" rjwt "github.com/wundergraph/cosmo/router/internal/jwt" rmiddleware "github.com/wundergraph/cosmo/router/internal/middleware" "github.com/wundergraph/cosmo/router/internal/recoveryhandler" @@ -518,6 +519,7 @@ type graphMux struct { prometheusCacheMetrics *rmetric.CacheMetrics otelCacheMetrics *rmetric.CacheMetrics streamMetricStore rmetric.StreamMetricStore + prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter } // buildOperationCaches creates the caches for the graph mux. @@ -765,6 +767,12 @@ func (s *graphMux) Shutdown(ctx context.Context) error { } } + if s.prometheusMetricsExporter != nil { + if aErr := s.prometheusMetricsExporter.Shutdown(ctx); aErr != nil { + err = errors.Join(err, aErr) + } + } + if err != nil { return fmt.Errorf("shutdown graph mux: %w", err) } @@ -912,16 +920,40 @@ func (s *graphServer) buildGraphMux( return nil, err } + // Create Prometheus metrics exporter for schema field usage if enabled + if s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled { + cfg := s.metricConfig.Prometheus.PromSchemaFieldUsage + settings := &graphqlmetrics.ExporterSettings{ + BatchSize: cfg.Exporter.BatchSize, + QueueSize: cfg.Exporter.QueueSize, + Interval: cfg.Exporter.Interval, + ExportTimeout: cfg.Exporter.ExportTimeout, + RetryOptions: graphqlmetrics.RetryOptions{ + Enabled: false, // Retry is disabled for Prometheus metrics + MaxRetry: 1, // Provide valid defaults even when disabled + MaxDuration: time.Second * 1, + Interval: time.Millisecond * 100, + }, + } + promExporter, err := graphqlmetrics.NewPrometheusMetricsExporter( + s.logger, + gm.metricStore, + cfg.IncludeOperationSha, + settings, + ) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus metrics exporter: %w", err) + } + gm.prometheusMetricsExporter = promExporter + } + metrics := NewRouterMetrics(&routerMetricsConfig{ - metrics: gm.metricStore, - gqlMetricsExporter: s.gqlMetricsExporter, - exportEnabled: s.graphqlMetricsConfig.Enabled, - routerConfigVersion: opts.RouterConfigVersion, - logger: s.logger, - - promSchemaUsageEnabled: s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled, - promSchemaUsageIncludeOpSha: s.metricConfig.Prometheus.PromSchemaFieldUsage.IncludeOperationSha, - promSchemaUsageSampleRate: s.metricConfig.Prometheus.PromSchemaFieldUsage.SampleRate, + metrics: gm.metricStore, + gqlMetricsExporter: s.gqlMetricsExporter, + prometheusMetricsExporter: gm.prometheusMetricsExporter, + exportEnabled: s.graphqlMetricsConfig.Enabled, + routerConfigVersion: opts.RouterConfigVersion, + logger: s.logger, }) baseLogFields := []zapcore.Field{ diff --git a/router/core/operation_metrics.go b/router/core/operation_metrics.go index d4615cf303..4f26530c5e 100644 --- a/router/core/operation_metrics.go +++ b/router/core/operation_metrics.go @@ -2,8 +2,6 @@ package core import ( "context" - "math/rand/v2" - "slices" "time" rotel "github.com/wundergraph/cosmo/router/pkg/otel" @@ -31,22 +29,14 @@ func (p OperationProtocol) String() string { // OperationMetrics is a struct that holds the metrics for an operation. It should be created on the parent router request // subgraph metrics are created in the transport or engine loader hooks. type OperationMetrics struct { - requestContentLength int64 - routerMetrics RouterMetrics - operationStartTime time.Time - inflightMetric func() - routerConfigVersion string - logger *zap.Logger - trackUsageInfo bool - - promSchemaUsageEnabled bool - promSchemaUsageIncludeOpSha bool - promSchemaUsageSampleRate float64 -} - -type usageKey struct { - fieldName string - parentType string + requestContentLength int64 + routerMetrics RouterMetrics + operationStartTime time.Time + inflightMetric func() + routerConfigVersion string + logger *zap.Logger + trackUsageInfo bool + prometheusUsageInfoTrack bool } func (m *OperationMetrics) Finish(reqContext *requestContext, statusCode int, responseSize int, exportSynchronous bool) { @@ -84,66 +74,29 @@ func (m *OperationMetrics) Finish(reqContext *requestContext, statusCode int, re rm.MeasureRequestSize(ctx, m.requestContentLength, sliceAttrs, o) rm.MeasureResponseSize(ctx, int64(responseSize), sliceAttrs, o) - if m.trackUsageInfo && reqContext.operation != nil && !reqContext.operation.executionOptions.SkipLoader { - m.routerMetrics.ExportSchemaUsageInfo(reqContext.operation, statusCode, reqContext.error != nil, exportSynchronous) - } - - // Prometheus usage metrics, disabled by default - if m.promSchemaUsageEnabled && reqContext.operation != nil { - - if !m.shouldSampleOperation() { - return + // Export schema usage info to configured exporters + if reqContext.operation != nil && !reqContext.operation.executionOptions.SkipLoader { + // GraphQL metrics export (to metrics service) + if m.trackUsageInfo { + m.routerMetrics.ExportSchemaUsageInfo(reqContext.operation, statusCode, reqContext.error != nil, exportSynchronous) } - opAttrs := []attribute.KeyValue{ - rotel.WgOperationName.String(reqContext.operation.name), - rotel.WgOperationType.String(reqContext.operation.opType), + // Prometheus metrics export (to local Prometheus metrics) + if m.prometheusUsageInfoTrack { + m.routerMetrics.ExportSchemaUsageInfoPrometheus(reqContext.operation, statusCode, reqContext.error != nil, exportSynchronous) } - - // Include operation SHA256 if enabled - if m.promSchemaUsageIncludeOpSha && reqContext.operation.sha256Hash != "" { - opAttrs = append(opAttrs, rotel.WgOperationSha256.String(reqContext.operation.sha256Hash)) - } - - usageCounts := make(map[usageKey]int) - - for _, field := range reqContext.operation.typeFieldUsageInfo { - if field.ExactParentTypeName == "" || len(field.Path) == 0 { - continue - } - - key := usageKey{ - fieldName: field.Path[len(field.Path)-1], - parentType: field.ExactParentTypeName, - } - - usageCounts[key]++ - } - - for key, count := range usageCounts { - fieldAttrs := []attribute.KeyValue{ - rotel.WgGraphQLFieldName.String(key.fieldName), - rotel.WgGraphQLParentType.String(key.parentType), - } - - rm.MeasureSchemaFieldUsage(ctx, int64(count), []attribute.KeyValue{}, otelmetric.WithAttributeSet(attribute.NewSet(slices.Concat(opAttrs, fieldAttrs)...))) - } - } } type OperationMetricsOptions struct { - InFlightAddOption otelmetric.AddOption - SliceAttributes []attribute.KeyValue - RouterConfigVersion string - RequestContentLength int64 - RouterMetrics RouterMetrics - Logger *zap.Logger - TrackUsageInfo bool - - PrometheusSchemaUsageEnabled bool - PrometheusSchemaUsageIncludeOpSha bool - PrometheusSchemaUsageSampleRate float64 + InFlightAddOption otelmetric.AddOption + SliceAttributes []attribute.KeyValue + RouterConfigVersion string + RequestContentLength int64 + RouterMetrics RouterMetrics + Logger *zap.Logger + TrackUsageInfo bool + PrometheusUsageInfoTrack bool } // newOperationMetrics creates a new OperationMetrics struct and starts the operation metrics. @@ -153,38 +106,13 @@ func newOperationMetrics(opts OperationMetricsOptions) *OperationMetrics { inflightMetric := opts.RouterMetrics.MetricStore().MeasureInFlight(context.Background(), opts.SliceAttributes, opts.InFlightAddOption) return &OperationMetrics{ - requestContentLength: opts.RequestContentLength, - operationStartTime: operationStartTime, - inflightMetric: inflightMetric, - routerConfigVersion: opts.RouterConfigVersion, - routerMetrics: opts.RouterMetrics, - logger: opts.Logger, - trackUsageInfo: opts.TrackUsageInfo, - - promSchemaUsageEnabled: opts.PrometheusSchemaUsageEnabled, - promSchemaUsageIncludeOpSha: opts.PrometheusSchemaUsageIncludeOpSha, - promSchemaUsageSampleRate: opts.PrometheusSchemaUsageSampleRate, + requestContentLength: opts.RequestContentLength, + operationStartTime: operationStartTime, + inflightMetric: inflightMetric, + routerConfigVersion: opts.RouterConfigVersion, + routerMetrics: opts.RouterMetrics, + logger: opts.Logger, + trackUsageInfo: opts.TrackUsageInfo, + prometheusUsageInfoTrack: opts.PrometheusUsageInfoTrack, } } - -// shouldSampleOperation determines if a request should be sampled for schema field usage metrics. -// Uses probabilistic random sampling to ensure uniform distribution across all operations. -// -// This ensures: -// - All operations get statistical coverage (~X% of requests per operation) -// - Uniform distribution regardless of request ID format -// - Supports ANY sample rate (0.0 to 1.0), including arbitrary values like 0.8, 0.156, etc. -// -// Note: Uses non-deterministic random sampling rather than hash-based sampling because -// sequential request IDs produce clustered hash values that break deterministic sampling. -func (m *OperationMetrics) shouldSampleOperation() bool { - if m.promSchemaUsageSampleRate >= 1.0 { - return true - } - if m.promSchemaUsageSampleRate <= 0.0 { - return false - } - - // Probabilistic sampling: simple, reliable, and guaranteed uniform distribution - return rand.Float64() < m.promSchemaUsageSampleRate -} diff --git a/router/core/router.go b/router/core/router.go index 2d542bb8ff..8247ed3b56 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -824,7 +824,7 @@ func (r *Router) bootstrap(ctx context.Context) error { r.graphqlMetricsConfig.CollectorEndpoint, connect.WithSendGzip(), ) - ge, err := graphqlmetrics.NewExporter( + ge, err := graphqlmetrics.NewGraphQLMetricsExporter( r.logger, client, r.graphApiToken, @@ -838,6 +838,18 @@ func (r *Router) bootstrap(ctx context.Context) error { r.logger.Info("GraphQL schema coverage metrics enabled") } + // Create Prometheus metrics exporter for schema field usage + // Note: This is separate from the Prometheus meter provider which handles OTEL metrics + // This exporter is specifically for schema field usage tracking via the Prometheus sink + if r.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled { + // The metric store will be passed in later when building the graph mux + // because each mux has its own metric store + // We'll create the exporter when building the mux in buildGraphMux + r.logger.Info("Prometheus schema field usage metrics enabled", + zap.Bool("include_operation_sha", r.metricConfig.Prometheus.PromSchemaFieldUsage.IncludeOperationSha), + ) + } + if r.Config.rateLimit != nil && r.Config.rateLimit.Enabled { var err error r.redisClient, err = rd.NewRedisCloser(&rd.RedisCloserOptions{ @@ -2311,7 +2323,12 @@ func MetricConfigFromTelemetry(cfg *config.Telemetry) *rmetric.Config { PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{ Enabled: cfg.Metrics.Prometheus.SchemaFieldUsage.Enabled, IncludeOperationSha: cfg.Metrics.Prometheus.SchemaFieldUsage.IncludeOperationSha, - SampleRate: cfg.Metrics.Prometheus.SchemaFieldUsage.SampleRate, + Exporter: rmetric.PrometheusSchemaFieldUsageExporter{ + BatchSize: cfg.Metrics.Prometheus.SchemaFieldUsage.Exporter.BatchSize, + QueueSize: cfg.Metrics.Prometheus.SchemaFieldUsage.Exporter.QueueSize, + Interval: cfg.Metrics.Prometheus.SchemaFieldUsage.Exporter.Interval, + ExportTimeout: cfg.Metrics.Prometheus.SchemaFieldUsage.Exporter.ExportTimeout, + }, }, }, } diff --git a/router/core/router_config.go b/router/core/router_config.go index b9cdac68b6..4e4b261f94 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -34,7 +34,8 @@ type Config struct { tracerProvider *sdktrace.TracerProvider otlpMeterProvider *sdkmetric.MeterProvider promMeterProvider *sdkmetric.MeterProvider - gqlMetricsExporter *graphqlmetrics.Exporter + gqlMetricsExporter *graphqlmetrics.GraphQLMetricsExporter + prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter corsOptions *cors.Config setConfigVersionHeader bool routerGracePeriod time.Duration diff --git a/router/core/router_metrics.go b/router/core/router_metrics.go index 4021eea2a7..d661fc8498 100644 --- a/router/core/router_metrics.go +++ b/router/core/router_metrics.go @@ -14,47 +14,40 @@ import ( type RouterMetrics interface { StartOperation(logger *zap.Logger, requestContentLength int64, sliceAttr []attribute.KeyValue, inFlightAddOption otelmetric.AddOption) *OperationMetrics ExportSchemaUsageInfo(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) - GqlMetricsExporter() *graphqlmetrics.Exporter + ExportSchemaUsageInfoPrometheus(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) + GqlMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter + PrometheusMetricsExporter() *graphqlmetrics.PrometheusMetricsExporter MetricStore() metric.Store } // routerMetrics encapsulates all data and configuration that the router // uses to collect and its metrics type routerMetrics struct { - metrics metric.Store - gqlMetricsExporter *graphqlmetrics.Exporter - routerConfigVersion string - logger *zap.Logger - exportEnabled bool - - promSchemaUsageEnabled bool - promSchemaUsageIncludeOpSha bool - promSchemaUsageSampleRate float64 + metrics metric.Store + gqlMetricsExporter *graphqlmetrics.GraphQLMetricsExporter + prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter + routerConfigVersion string + logger *zap.Logger + exportEnabled bool } type routerMetricsConfig struct { - metrics metric.Store - gqlMetricsExporter *graphqlmetrics.Exporter - routerConfigVersion string - logger *zap.Logger - exportEnabled bool - - promSchemaUsageEnabled bool - promSchemaUsageIncludeOpSha bool - promSchemaUsageSampleRate float64 + metrics metric.Store + gqlMetricsExporter *graphqlmetrics.GraphQLMetricsExporter + prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter + routerConfigVersion string + logger *zap.Logger + exportEnabled bool } func NewRouterMetrics(cfg *routerMetricsConfig) RouterMetrics { return &routerMetrics{ - metrics: cfg.metrics, - gqlMetricsExporter: cfg.gqlMetricsExporter, - routerConfigVersion: cfg.routerConfigVersion, - logger: cfg.logger, - exportEnabled: cfg.exportEnabled, - - promSchemaUsageEnabled: cfg.promSchemaUsageEnabled, - promSchemaUsageIncludeOpSha: cfg.promSchemaUsageIncludeOpSha, - promSchemaUsageSampleRate: cfg.promSchemaUsageSampleRate, + metrics: cfg.metrics, + gqlMetricsExporter: cfg.gqlMetricsExporter, + prometheusMetricsExporter: cfg.prometheusMetricsExporter, + routerConfigVersion: cfg.routerConfigVersion, + logger: cfg.logger, + exportEnabled: cfg.exportEnabled, } } @@ -63,17 +56,14 @@ func NewRouterMetrics(cfg *routerMetricsConfig) RouterMetrics { // returns nil, but OperationMetrics is safe to call with a nil receiver. func (m *routerMetrics) StartOperation(logger *zap.Logger, requestContentLength int64, sliceAttr []attribute.KeyValue, inFlightAddOption otelmetric.AddOption) *OperationMetrics { metrics := newOperationMetrics(OperationMetricsOptions{ - RouterMetrics: m, - Logger: logger, - RequestContentLength: requestContentLength, - RouterConfigVersion: m.routerConfigVersion, - TrackUsageInfo: m.exportEnabled, - InFlightAddOption: inFlightAddOption, - SliceAttributes: sliceAttr, - - PrometheusSchemaUsageEnabled: m.promSchemaUsageEnabled, - PrometheusSchemaUsageIncludeOpSha: m.promSchemaUsageIncludeOpSha, - PrometheusSchemaUsageSampleRate: m.promSchemaUsageSampleRate, + RouterMetrics: m, + Logger: logger, + RequestContentLength: requestContentLength, + RouterConfigVersion: m.routerConfigVersion, + TrackUsageInfo: m.exportEnabled, + PrometheusUsageInfoTrack: m.prometheusMetricsExporter != nil, + InFlightAddOption: inFlightAddOption, + SliceAttributes: sliceAttr, }) return metrics } @@ -82,10 +72,14 @@ func (m *routerMetrics) MetricStore() metric.Store { return m.metrics } -func (m *routerMetrics) GqlMetricsExporter() *graphqlmetrics.Exporter { +func (m *routerMetrics) GqlMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter { return m.gqlMetricsExporter } +func (m *routerMetrics) PrometheusMetricsExporter() *graphqlmetrics.PrometheusMetricsExporter { + return m.prometheusMetricsExporter +} + func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) { if !m.exportEnabled { return @@ -138,6 +132,32 @@ func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext m.gqlMetricsExporter.RecordUsage(item, exportSynchronous) } +func (m *routerMetrics) ExportSchemaUsageInfoPrometheus(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) { + var opType graphqlmetricsv1.OperationType + switch operationContext.opType { + case OperationTypeQuery: + opType = graphqlmetricsv1.OperationType_QUERY + case OperationTypeMutation: + opType = graphqlmetricsv1.OperationType_MUTATION + case OperationTypeSubscription: + opType = graphqlmetricsv1.OperationType_SUBSCRIPTION + } + + item := &graphqlmetricsv1.SchemaUsageInfo{ + TypeFieldMetrics: operationContext.typeFieldUsageInfo.IntoGraphQLMetrics(), + OperationInfo: &graphqlmetricsv1.OperationInfo{ + Type: opType, + Hash: operationContext.sha256Hash, + Name: m.strCopy(operationContext.name), + }, + SchemaInfo: &graphqlmetricsv1.SchemaInfo{ + Version: m.routerConfigVersion, + }, + } + + m.prometheusMetricsExporter.RecordUsage(item, exportSynchronous) +} + func (m *routerMetrics) strCopy(s string) string { b := make([]byte, len(s)) copy(b, s) diff --git a/router/internal/graphqlmetrics/exporter.go b/router/internal/graphqlmetrics/exporter.go index 71a8be9015..d276787af1 100644 --- a/router/internal/graphqlmetrics/exporter.go +++ b/router/internal/graphqlmetrics/exporter.go @@ -2,29 +2,26 @@ package graphqlmetrics import ( "context" - "errors" "fmt" "time" - "connectrpc.com/connect" "github.com/cloudflare/backoff" - graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" - "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" "go.uber.org/atomic" "go.uber.org/zap" ) -type Exporter struct { - settings *ExporterSettings - logger *zap.Logger - client graphqlmetricsv1connect.GraphQLMetricsServiceClient - apiToken string - +// Exporter is a generic, thread-safe batch exporter that queues items and sends them +// to a sink in batches at regular intervals or when the batch size is reached. +// It supports configurable retry logic and graceful shutdown. +type Exporter[T any] struct { + settings *ExporterSettings + logger *zap.Logger + sink Sink[T] + isRetryableError SinkErrorHandler shutdownSignal chan struct{} acceptTrafficSema chan struct{} - - queue chan *graphqlmetrics.SchemaUsageInfo - inflightBatches *atomic.Int64 + queue chan T + inflightBatches *atomic.Int64 // exportRequestContext is used to cancel all requests that started before the shutdown exportRequestContext context.Context @@ -78,17 +75,28 @@ func NewDefaultExporterSettings() *ExporterSettings { } } -// NewExporter creates a new GraphQL metrics exporter. The collectorEndpoint is the endpoint to which the metrics -// are sent. The apiToken is the token used to authenticate with the collector. The collector supports Brotli compression -// and retries on failure. Underling queue implementation sends batches of metrics at the specified interval and batch size. -func NewExporter(logger *zap.Logger, client graphqlmetricsv1connect.GraphQLMetricsServiceClient, apiToken string, settings *ExporterSettings) (*Exporter, error) { +// NewExporter creates a new generic batch exporter. +// The sink is responsible for actually sending the batches to their destination. +// The isRetryableError function determines whether failed exports should be retried. +// If isRetryableError is nil, all errors are considered retryable. +func NewExporter[T any](logger *zap.Logger, sink Sink[T], isRetryableError SinkErrorHandler, settings *ExporterSettings) (*Exporter[T], error) { + if sink == nil { + return nil, fmt.Errorf("sink cannot be nil") + } + ctx, cancel := context.WithCancel(context.Background()) - e := &Exporter{ - logger: logger.With(zap.String("component", "graphqlmetrics_exporter")), + + // Default error handler treats all errors as retryable + if isRetryableError == nil { + isRetryableError = func(err error) bool { return true } + } + + e := &Exporter[T]{ + logger: logger.With(zap.String("component", "exporter")), settings: settings, - client: client, - apiToken: apiToken, - queue: make(chan *graphqlmetrics.SchemaUsageInfo, settings.QueueSize), + sink: sink, + isRetryableError: isRetryableError, + queue: make(chan T, settings.QueueSize), shutdownSignal: make(chan struct{}), acceptTrafficSema: make(chan struct{}), inflightBatches: atomic.NewInt64(0), @@ -102,39 +110,39 @@ func NewExporter(logger *zap.Logger, client graphqlmetricsv1connect.GraphQLMetri return e, nil } -func (e *Exporter) validate() error { +func (e *Exporter[T]) validate() error { if e.settings.BatchSize <= 0 { - return errors.New("batch size must be positive") + return fmt.Errorf("batch size must be positive") } if e.settings.QueueSize <= 0 { - return errors.New("queue size must be positive") + return fmt.Errorf("queue size must be positive") } if e.settings.Interval <= 0 { - return errors.New("interval must be positive") + return fmt.Errorf("interval must be positive") } if e.settings.ExportTimeout <= 0 { - return errors.New("export timeout must be positive") + return fmt.Errorf("export timeout must be positive") } if e.settings.RetryOptions.MaxDuration <= 0 { - return errors.New("retry max duration must be positive") + return fmt.Errorf("retry max duration must be positive") } if e.settings.RetryOptions.Interval <= 0 { - return errors.New("retry interval must be positive") + return fmt.Errorf("retry interval must be positive") } if e.settings.RetryOptions.MaxRetry <= 0 { - return errors.New("retry max retry must be positive") + return fmt.Errorf("retry max retry must be positive") } return nil } -func (e *Exporter) acceptTraffic() bool { +func (e *Exporter[T]) acceptTraffic() bool { // while the channel is not closed, the select will always return the default case // once it's closed, the select will always return _,false (closed channel) from the channel select { @@ -145,106 +153,87 @@ func (e *Exporter) acceptTraffic() bool { } } -func (e *Exporter) RecordUsage(usageInfo *graphqlmetrics.SchemaUsageInfo, synchronous bool) (ok bool) { +// Record adds an item to the export queue. +// If synchronous is true, the item is sent immediately in the current goroutine. +// Otherwise, it's added to the queue for batch processing. +// Returns false if the queue is full or if the exporter is shutting down. +func (e *Exporter[T]) Record(item T, synchronous bool) (ok bool) { if synchronous { - _ = e.sendItems([]*graphqlmetrics.SchemaUsageInfo{usageInfo}) + var batch []T + batch = append(batch, item) + _ = e.exportBatch(batch) return true } if !e.acceptTraffic() { return false } select { - case e.queue <- usageInfo: + case e.queue <- item: return true default: - e.logger.Warn("RecordAsync: Queue is full, dropping item") + e.logger.Warn("Record: Queue is full, dropping item") return false } } -func (e *Exporter) sendItems(items []*graphqlmetrics.SchemaUsageInfo) error { - e.logger.Debug("sending batch", zap.Int("size", len(items))) - ctx := e.exportRequestContext - if e.settings.ExportTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(e.exportRequestContext, e.settings.ExportTimeout) - defer cancel() - } - - req := connect.NewRequest(&graphqlmetrics.PublishGraphQLRequestMetricsRequest{ - SchemaUsage: items, - }) - - req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", e.apiToken)) - - _, err := e.client.PublishGraphQLMetrics(ctx, req) - if err != nil { - e.logger.Debug("Failed to export batch", zap.Error(err), zap.Int("batch_size", len(items))) - return err +// exportBatch sends a batch of items to the sink with timeout handling. +func (e *Exporter[T]) exportBatch(batch []T) error { + if len(batch) == 0 { + return nil } - e.logger.Debug("Successfully exported batch", zap.Int("batch_size", len(items))) + e.logger.Debug("Exporting batch", zap.Int("size", len(batch))) - return nil -} - -func (e *Exporter) sendAggregation(ctx context.Context, request *graphqlmetrics.PublishAggregatedGraphQLRequestMetricsRequest) error { - e.logger.Debug("sendAggregation", zap.Int("size", len(request.Aggregation))) + ctx := e.exportRequestContext if e.settings.ExportTimeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, e.settings.ExportTimeout) + ctx, cancel = context.WithTimeout(e.exportRequestContext, e.settings.ExportTimeout) defer cancel() } - req := connect.NewRequest(request) - - req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", e.apiToken)) - - _, err := e.client.PublishAggregatedGraphQLMetrics(ctx, req) + err := e.sink.Export(ctx, batch) if err != nil { - e.logger.Debug("sendAggregation failed", zap.Error(err), zap.Int("batch_size", len(request.Aggregation))) + e.logger.Debug("Failed to export batch", zap.Error(err), zap.Int("batch_size", len(batch))) return err } - e.logger.Debug("sendAggregation success", zap.Int("batch_size", len(request.Aggregation))) - + e.logger.Debug("Successfully exported batch", zap.Int("batch_size", len(batch))) return nil } -func (e *Exporter) prepareAndSendBatch(batch []*graphqlmetrics.SchemaUsageInfo) { - e.logger.Debug("Exporter.prepareAndSendBatch", zap.Int("batch_size", len(batch))) +// prepareAndSendBatch starts a goroutine to export the batch with retry logic. +func (e *Exporter[T]) prepareAndSendBatch(batch []T) { + e.logger.Debug("Preparing to send batch", zap.Int("batch_size", len(batch))) e.inflightBatches.Inc() go func() { defer e.inflightBatches.Dec() - e.aggregateAndSendBatch(batch) + e.exportBatchWithRetry(batch) }() } -// export sends the batch to the configured endpoint. -func (e *Exporter) aggregateAndSendBatch(batch []*graphqlmetrics.SchemaUsageInfo) { +// exportBatchWithRetry attempts to export a batch with exponential backoff retry logic. +func (e *Exporter[T]) exportBatchWithRetry(batch []T) { b := backoff.New(e.settings.RetryOptions.MaxDuration, e.settings.RetryOptions.Interval) defer b.Reset() - request := AggregateSchemaUsageInfoBatch(batch) - - err := e.sendAggregation(e.exportRequestContext, request) + err := e.exportBatch(batch) if err == nil { return } - var connectErr *connect.Error - if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeUnauthenticated { - e.logger.Error("Failed to export batch due to unauthenticated error, not retrying", + // Check if error is retryable + if !e.isRetryableError(err) { + e.logger.Error("Failed to export batch with non-retryable error", zap.Error(err), - zap.Int("batch_size", len(request.Aggregation)), + zap.Int("batch_size", len(batch)), ) return } if !e.settings.RetryOptions.Enabled { - e.logger.Error("Failed to export batch", + e.logger.Error("Failed to export batch, retries disabled", zap.Error(err), - zap.Int("batch_size", len(request.Aggregation)), + zap.Int("batch_size", len(batch)), ) return } @@ -252,45 +241,48 @@ func (e *Exporter) aggregateAndSendBatch(batch []*graphqlmetrics.SchemaUsageInfo var retry int var lastErr error - for retry <= e.settings.RetryOptions.MaxRetry { - + for retry < e.settings.RetryOptions.MaxRetry { retry++ // Wait for the specified backoff period sleepDuration := b.Duration() - e.logger.Debug(fmt.Sprintf("Retrying export in %s ...", sleepDuration.String()), - zap.Int("batch_size", len(request.Aggregation)), + e.logger.Debug("Retrying export after backoff", + zap.Int("batch_size", len(batch)), zap.Int("retry", retry), zap.Duration("sleep", sleepDuration), ) - // Wait for the specified backoff period time.Sleep(sleepDuration) - err = e.sendAggregation(e.exportRequestContext, request) + err = e.exportBatch(batch) if err == nil { + e.logger.Debug("Export succeeded after retry", zap.Int("retry", retry)) return } - if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeUnauthenticated { - e.logger.Error("Failed to export batch due to unauthenticated error, not retrying", + + // Check if the new error is retryable + if !e.isRetryableError(err) { + e.logger.Error("Failed to export batch with non-retryable error during retry", zap.Error(err), - zap.Int("batch_size", len(request.Aggregation)), + zap.Int("batch_size", len(batch)), + zap.Int("retry", retry), ) return } + lastErr = err } - e.logger.Error("Failed to export batch after retries", + e.logger.Error("Failed to export batch after all retries", zap.Error(lastErr), - zap.Int("batch_size", len(request.Aggregation)), + zap.Int("batch_size", len(batch)), zap.Int("retries", retry), ) } // start starts the exporter and blocks until the exporter is shutdown. -func (e *Exporter) start() { +func (e *Exporter[T]) start() { e.logger.Debug("Starting exporter") ticker := time.NewTicker(e.settings.Interval) defer func() { @@ -298,36 +290,37 @@ func (e *Exporter) start() { e.logger.Debug("Exporter stopped") }() - var buffer []*graphqlmetrics.SchemaUsageInfo + var buffer []T for { if buffer == nil { - buffer = make([]*graphqlmetrics.SchemaUsageInfo, 0, e.settings.BatchSize) + buffer = make([]T, 0, e.settings.BatchSize) } select { case <-ticker.C: - e.logger.Debug("Exporter.start: tick") + e.logger.Debug("Tick: flushing buffer", zap.Int("buffer_size", len(buffer))) if len(buffer) > 0 { e.prepareAndSendBatch(buffer) buffer = nil } case item := <-e.queue: - e.logger.Debug("Exporter.start: item") buffer = append(buffer, item) if len(buffer) == e.settings.BatchSize { + e.logger.Debug("Buffer full, sending batch", zap.Int("batch_size", len(buffer))) e.prepareAndSendBatch(buffer) buffer = nil } case <-e.shutdownSignal: - e.logger.Debug("Exporter.start: shutdown") + e.logger.Debug("Shutdown signal received, draining queue") e.drainQueue(buffer) return } } } -func (e *Exporter) drainQueue(buffer []*graphqlmetrics.SchemaUsageInfo) { - e.logger.Debug("Exporter.closeAndDrainQueue") +// drainQueue processes all remaining items in the queue during shutdown. +func (e *Exporter[T]) drainQueue(buffer []T) { + e.logger.Debug("Draining queue") drainedItems := 0 for { select { @@ -336,41 +329,48 @@ func (e *Exporter) drainQueue(buffer []*graphqlmetrics.SchemaUsageInfo) { buffer = append(buffer, item) if len(buffer) == e.settings.BatchSize { e.prepareAndSendBatch(buffer) - buffer = make([]*graphqlmetrics.SchemaUsageInfo, 0, e.settings.BatchSize) + buffer = make([]T, 0, e.settings.BatchSize) } default: if len(buffer) > 0 { e.prepareAndSendBatch(buffer) } - e.logger.Debug("Exporter.closeAndDrainQueue: done", zap.Int("drained_items", drainedItems)) + e.logger.Debug("Queue drained", zap.Int("drained_items", drainedItems)) return } } } -// Shutdown the exporter but waits until all export jobs has been finished or timeout. -// If the context is canceled, the exporter will be shutdown immediately. -func (e *Exporter) Shutdown(ctx context.Context) error { +// Shutdown gracefully shuts down the exporter. +// It stops accepting new items, drains the queue, waits for in-flight batches to complete, +// and closes the sink. If the context is cancelled, shutdown is forced. +func (e *Exporter[T]) Shutdown(ctx context.Context) error { + e.logger.Debug("Shutdown started") + ticker := time.NewTicker(time.Millisecond * 100) defer func() { ticker.Stop() - // cancel all requests + // Cancel all export requests e.cancelAllExportRequests() - e.logger.Debug("Exporter.Shutdown: done") + // Close the sink + if err := e.sink.Close(ctx); err != nil { + e.logger.Error("Error closing sink", zap.Error(err)) + } + e.logger.Debug("Shutdown complete") }() - // first close the acceptTrafficSema to stop accepting new items + // First close the acceptTrafficSema to stop accepting new items close(e.acceptTrafficSema) - // then trigger the shutdown signal for the exporter goroutine to stop - // it will then drain the queue and send the remaining items + // Then trigger the shutdown signal for the exporter goroutine to stop + // It will drain the queue and send the remaining items close(e.shutdownSignal) - // we're polling the inflightBatches to wait for all inflight batches to finish or timeout - // we're not using a wait group here because you can't wait for a wait group with a timeout - + // Poll the inflightBatches to wait for all in-flight batches to finish or timeout + // We're not using a wait group here because you can't wait for a wait group with a timeout for { select { case <-ctx.Done(): + e.logger.Warn("Shutdown cancelled by context", zap.Error(ctx.Err())) return ctx.Err() case <-ticker.C: if e.inflightBatches.Load() == 0 { diff --git a/router/internal/graphqlmetrics/exporter_test.go b/router/internal/graphqlmetrics/exporter_test.go index a5c521e4f3..85545914d9 100644 --- a/router/internal/graphqlmetrics/exporter_test.go +++ b/router/internal/graphqlmetrics/exporter_test.go @@ -49,7 +49,7 @@ func TestExportAggregationSameSchemaUsages(t *testing.T) { totalItems := 100 batchSize := 100 - e, err := NewExporter( + e, err := NewGraphQLMetricsExporter( zap.NewNop(), c, "secret", @@ -128,7 +128,7 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) { totalItems := 100 batchSize := 5 - e, err := NewExporter( + e, err := NewGraphQLMetricsExporter( zap.NewNop(), c, "secret", @@ -189,15 +189,14 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) { func TestForceFlushSync(t *testing.T) { c := &MyClient{ - t: t, - publishedBatches: make([][]*graphqlmetricsv1.SchemaUsageInfo, 0), + t: t, } queueSize := 100 totalItems := 10 batchSize := 5 - e, err := NewExporter( + e, err := NewGraphQLMetricsExporter( zap.NewNop(), c, "secret", @@ -254,13 +253,14 @@ func TestForceFlushSync(t *testing.T) { } c.mu.Lock() - require.Equal(t, 10, len(c.publishedBatches)) - require.Equal(t, 1, len(c.publishedBatches[0])) + // Synchronous mode now sends items through aggregation, so they appear as aggregations + require.Equal(t, 10, len(c.publishedAggregations)) + require.Equal(t, 1, len(c.publishedAggregations[0])) // Make sure that the exporter is still working after a forced flush - // Reset the published batches - c.publishedBatches = c.publishedBatches[:0] + // Reset the published aggregations + c.publishedAggregations = c.publishedAggregations[:0] c.mu.Unlock() for i := 0; i < totalItems; i++ { @@ -299,8 +299,8 @@ func TestForceFlushSync(t *testing.T) { c.mu.Lock() defer c.mu.Unlock() - require.Equal(t, 10, len(c.publishedBatches)) - require.Equal(t, 1, len(c.publishedBatches[0])) + require.Equal(t, 10, len(c.publishedAggregations)) + require.Equal(t, 1, len(c.publishedAggregations[0])) } func TestExportBatchInterval(t *testing.T) { @@ -312,7 +312,7 @@ func TestExportBatchInterval(t *testing.T) { totalItems := 5 batchSize := 10 - e, err := NewExporter( + e, err := NewGraphQLMetricsExporter( zap.NewNop(), c, "secret", @@ -384,7 +384,7 @@ func TestExportFullQueue(t *testing.T) { totalItems := 100 batchSize := 1 - e, err := NewExporter( + e, err := NewGraphQLMetricsExporter( zap.NewNop(), c, "secret", diff --git a/router/internal/graphqlmetrics/graphql_exporter.go b/router/internal/graphqlmetrics/graphql_exporter.go new file mode 100644 index 0000000000..1d4ab01835 --- /dev/null +++ b/router/internal/graphqlmetrics/graphql_exporter.go @@ -0,0 +1,52 @@ +package graphqlmetrics + +import ( + "context" + + graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" + "go.uber.org/zap" +) + +// GraphQLMetricsExporter wraps the generic Exporter for GraphQL metrics. +// It provides a cleaner API and backward compatibility with the old interface. +type GraphQLMetricsExporter struct { + exporter *Exporter[*graphqlmetrics.SchemaUsageInfo] +} + +// NewGraphQLMetricsExporter creates a new exporter specifically for GraphQL metrics. +// This is a convenience function that wraps the generic NewExporter with GraphQLMetricsSink. +func NewGraphQLMetricsExporter( + logger *zap.Logger, + client graphqlmetricsv1connect.GraphQLMetricsServiceClient, + apiToken string, + settings *ExporterSettings, +) (*GraphQLMetricsExporter, error) { + sink := NewGraphQLMetricsSink(GraphQLMetricsSinkConfig{ + Client: client, + APIToken: apiToken, + Logger: logger, + }) + + exporter, err := NewExporter(logger, sink, IsRetryableError, settings) + if err != nil { + return nil, err + } + + return &GraphQLMetricsExporter{ + exporter: exporter, + }, nil +} + +// RecordUsage records a schema usage info item for export. +// If synchronous is true, the item is sent immediately. Otherwise, it's queued for batch processing. +// Returns false if the queue is full or the exporter is shutting down. +func (e *GraphQLMetricsExporter) RecordUsage(usageInfo *graphqlmetrics.SchemaUsageInfo, synchronous bool) bool { + return e.exporter.Record(usageInfo, synchronous) +} + +// Shutdown gracefully shuts down the exporter. +// It stops accepting new items, drains the queue, waits for in-flight batches, and closes the sink. +func (e *GraphQLMetricsExporter) Shutdown(ctx context.Context) error { + return e.exporter.Shutdown(ctx) +} diff --git a/router/internal/graphqlmetrics/graphql_metrics_sink.go b/router/internal/graphqlmetrics/graphql_metrics_sink.go new file mode 100644 index 0000000000..1929ded6de --- /dev/null +++ b/router/internal/graphqlmetrics/graphql_metrics_sink.go @@ -0,0 +1,91 @@ +package graphqlmetrics + +import ( + "context" + "errors" + "fmt" + + "connectrpc.com/connect" + graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" + "go.uber.org/zap" +) + +// GraphQLMetricsSink implements the Sink interface for sending aggregated GraphQL metrics +// to the Cosmo GraphQL Metrics Service via Connect RPC. +type GraphQLMetricsSink struct { + client graphqlmetricsv1connect.GraphQLMetricsServiceClient + apiToken string + logger *zap.Logger +} + +// GraphQLMetricsSinkConfig contains configuration for creating a GraphQLMetricsSink. +type GraphQLMetricsSinkConfig struct { + Client graphqlmetricsv1connect.GraphQLMetricsServiceClient + APIToken string + Logger *zap.Logger +} + +// NewGraphQLMetricsSink creates a new sink that sends metrics to the GraphQL Metrics Service. +func NewGraphQLMetricsSink(cfg GraphQLMetricsSinkConfig) *GraphQLMetricsSink { + return &GraphQLMetricsSink{ + client: cfg.Client, + apiToken: cfg.APIToken, + logger: cfg.Logger.With(zap.String("component", "graphql_metrics_sink")), + } +} + +// Export sends a batch of SchemaUsageInfo items to the GraphQL Metrics Service. +// It aggregates the items before sending to reduce payload size and improve efficiency. +func (s *GraphQLMetricsSink) Export(ctx context.Context, batch []*graphqlmetrics.SchemaUsageInfo) error { + if len(batch) == 0 { + return nil + } + + s.logger.Debug("Exporting batch", zap.Int("size", len(batch))) + + // Aggregate the batch to reduce payload size + request := AggregateSchemaUsageInfoBatch(batch) + + req := connect.NewRequest(request) + req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", s.apiToken)) + + _, err := s.client.PublishAggregatedGraphQLMetrics(ctx, req) + if err != nil { + s.logger.Debug("Failed to export batch", zap.Error(err), zap.Int("batch_size", len(request.Aggregation))) + return err + } + + s.logger.Debug("Successfully exported batch", zap.Int("batch_size", len(request.Aggregation))) + return nil +} + +// Close performs cleanup when shutting down the sink. +// For GraphQLMetricsSink, there's no specific cleanup needed. +func (s *GraphQLMetricsSink) Close(ctx context.Context) error { + s.logger.Debug("Closing GraphQL metrics sink") + return nil +} + +// IsRetryableError determines if an error from the GraphQL Metrics Service is retryable. +// Authentication errors should not be retried, while network and server errors should be. +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + var connectErr *connect.Error + if errors.As(err, &connectErr) { + switch connectErr.Code() { + case connect.CodeUnauthenticated, connect.CodePermissionDenied, connect.CodeInvalidArgument: + // Don't retry authentication, authorization, or validation errors + return false + default: + // Retry other errors (network issues, server errors, etc.) + return true + } + } + + // Unknown errors are retryable by default + return true +} diff --git a/router/internal/graphqlmetrics/prometheus_exporter.go b/router/internal/graphqlmetrics/prometheus_exporter.go new file mode 100644 index 0000000000..0b1ec07047 --- /dev/null +++ b/router/internal/graphqlmetrics/prometheus_exporter.go @@ -0,0 +1,58 @@ +package graphqlmetrics + +import ( + "context" + + graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/pkg/metric" + "go.uber.org/zap" +) + +// PrometheusMetricsExporter wraps the generic Exporter for Prometheus metrics. +// It provides a cleaner API for exporting schema field usage to Prometheus. +type PrometheusMetricsExporter struct { + exporter *Exporter[*graphqlmetrics.SchemaUsageInfo] +} + +// NewPrometheusMetricsExporter creates a new exporter specifically for Prometheus metrics. +// This is a convenience function that wraps the generic NewExporter with PrometheusSink. +func NewPrometheusMetricsExporter( + logger *zap.Logger, + metricStore metric.Store, + includeOpSha bool, + settings *ExporterSettings, +) (*PrometheusMetricsExporter, error) { + sink := NewPrometheusSink(PrometheusSinkConfig{ + MetricStore: metricStore, + Logger: logger, + IncludeOpSha: includeOpSha, + }) + + // Prometheus metrics are local, so errors are generally not retryable + // (they indicate programming errors or resource exhaustion) + errorHandler := func(err error) bool { + return false // Don't retry Prometheus errors + } + + exporter, err := NewExporter(logger, sink, errorHandler, settings) + if err != nil { + return nil, err + } + + return &PrometheusMetricsExporter{ + exporter: exporter, + }, nil +} + +// RecordUsage records a schema usage info item for Prometheus export. +// If synchronous is true, the item is processed immediately. Otherwise, it's queued for batch processing. +// Returns false if the queue is full or the exporter is shutting down. +func (e *PrometheusMetricsExporter) RecordUsage(usageInfo *graphqlmetrics.SchemaUsageInfo, synchronous bool) bool { + return e.exporter.Record(usageInfo, synchronous) +} + +// Shutdown gracefully shuts down the exporter. +// It stops accepting new items, drains the queue, waits for in-flight batches, and closes the sink. +func (e *PrometheusMetricsExporter) Shutdown(ctx context.Context) error { + return e.exporter.Shutdown(ctx) +} diff --git a/router/internal/graphqlmetrics/prometheus_sink.go b/router/internal/graphqlmetrics/prometheus_sink.go new file mode 100644 index 0000000000..a76ea80435 --- /dev/null +++ b/router/internal/graphqlmetrics/prometheus_sink.go @@ -0,0 +1,163 @@ +package graphqlmetrics + +import ( + "context" + "slices" + + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" + + graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/pkg/metric" + rotel "github.com/wundergraph/cosmo/router/pkg/otel" +) + +// PrometheusSink implements the Sink interface for exporting schema field usage metrics +// to Prometheus via OpenTelemetry metrics. It measures field usage patterns by operation. +type PrometheusSink struct { + metricStore metric.Store + logger *zap.Logger + includeOpSha bool +} + +// PrometheusSinkConfig contains configuration for creating a PrometheusSink. +type PrometheusSinkConfig struct { + MetricStore metric.Store + Logger *zap.Logger + IncludeOpSha bool // Whether to include operation SHA256 in metrics +} + +// aggregatedUsageKey represents a unique combination of operation and field attributes +type aggregatedUsageKey struct { + operationName string + operationType string + operationHash string // empty if not included + fieldName string + parentType string +} + +// NewPrometheusSink creates a new sink that exports schema field usage metrics to Prometheus. +func NewPrometheusSink(cfg PrometheusSinkConfig) *PrometheusSink { + return &PrometheusSink{ + metricStore: cfg.MetricStore, + logger: cfg.Logger.With(zap.String("component", "prometheus_sink")), + includeOpSha: cfg.IncludeOpSha, + } +} + +// Export processes a batch of SchemaUsageInfo items and records their field usage to Prometheus. +// It aggregates field usage across the entire batch before recording metrics, minimizing the number +// of calls to MeasureSchemaFieldUsage. +func (s *PrometheusSink) Export(ctx context.Context, batch []*graphqlmetrics.SchemaUsageInfo) error { + if len(batch) == 0 { + return nil + } + + s.logger.Debug("Exporting schema field usage to Prometheus", zap.Int("batch_size", len(batch))) + + // Aggregate all field usage across the entire batch + aggregatedCounts := s.aggregateBatch(batch) + + // Record metrics for each unique combination of operation + field attributes + for key, count := range aggregatedCounts { + opAttrs := []attribute.KeyValue{ + rotel.WgOperationName.String(key.operationName), + rotel.WgOperationType.String(key.operationType), + } + + // Include operation SHA256 if it was provided + if key.operationHash != "" { + opAttrs = append(opAttrs, rotel.WgOperationSha256.String(key.operationHash)) + } + + fieldAttrs := []attribute.KeyValue{ + rotel.WgGraphQLFieldName.String(key.fieldName), + rotel.WgGraphQLParentType.String(key.parentType), + } + + allAttrs := slices.Concat(opAttrs, fieldAttrs) + s.metricStore.MeasureSchemaFieldUsage( + ctx, + int64(count), + []attribute.KeyValue{}, + otelmetric.WithAttributeSet(attribute.NewSet(allAttrs...)), + ) + } + + s.logger.Debug("Successfully exported schema field usage to Prometheus", + zap.Int("batch_size", len(batch)), + zap.Int("unique_metrics", len(aggregatedCounts))) + return nil +} + +// Close performs cleanup when shutting down the sink. +// For PrometheusSink, there's no specific cleanup needed. +func (s *PrometheusSink) Close(ctx context.Context) error { + s.logger.Debug("Closing Prometheus sink") + return nil +} + +// aggregateBatch aggregates field usage counts across the entire batch, +// grouping by operation attributes and field attributes. +func (s *PrometheusSink) aggregateBatch(batch []*graphqlmetrics.SchemaUsageInfo) map[aggregatedUsageKey]int { + aggregatedCounts := make(map[aggregatedUsageKey]int) + + for _, usageInfo := range batch { + if usageInfo.OperationInfo == nil || usageInfo.TypeFieldMetrics == nil { + continue + } + + // Extract operation info + opName := usageInfo.OperationInfo.Name + opType := s.operationTypeToString(usageInfo.OperationInfo.Type) + opHash := "" + if s.includeOpSha { + opHash = usageInfo.OperationInfo.Hash + } + + // Process each field in this usage info + for _, field := range usageInfo.TypeFieldMetrics { + // Skip fields without valid parent type or path + if len(field.Path) == 0 || len(field.TypeNames) < 1 { + continue + } + + // The parent type is typically the first type in the TypeNames list + // The field name is the last element in the path + parentType := field.TypeNames[0] + fieldName := field.Path[len(field.Path)-1] + + key := aggregatedUsageKey{ + operationName: opName, + operationType: opType, + operationHash: opHash, + fieldName: fieldName, + parentType: parentType, + } + + // Increment count, using field.Count if available, otherwise 1 + if field.Count > 0 { + aggregatedCounts[key] += int(field.Count) + } else { + aggregatedCounts[key]++ + } + } + } + + return aggregatedCounts +} + +// operationTypeToString converts the protobuf OperationType to a string. +func (s *PrometheusSink) operationTypeToString(opType graphqlmetrics.OperationType) string { + switch opType { + case graphqlmetrics.OperationType_QUERY: + return "query" + case graphqlmetrics.OperationType_MUTATION: + return "mutation" + case graphqlmetrics.OperationType_SUBSCRIPTION: + return "subscription" + default: + return "unknown" + } +} diff --git a/router/internal/graphqlmetrics/sink.go b/router/internal/graphqlmetrics/sink.go new file mode 100644 index 0000000000..45979fb117 --- /dev/null +++ b/router/internal/graphqlmetrics/sink.go @@ -0,0 +1,23 @@ +package graphqlmetrics + +import ( + "context" +) + +// Sink defines the interface for exporting batches of items to a destination. +// Implementations must be safe for concurrent use. +type Sink[T any] interface { + // Export sends a batch of items to the sink destination. + // It returns an error if the export fails. + // The context can be used to cancel or timeout the export operation. + Export(ctx context.Context, batch []T) error + + // Close performs any cleanup needed when shutting down the sink. + // It should block until all cleanup is complete or the context is cancelled. + Close(ctx context.Context) error +} + +// SinkErrorHandler is called when a sink export fails. +// It receives the error and can inspect it to determine if retry should be attempted. +// Return true if the error is retryable, false otherwise. +type SinkErrorHandler func(error) (retryable bool) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 5e443a0dd0..7d80bec9c9 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -113,9 +113,16 @@ type Prometheus struct { } type PrometheusSchemaFieldUsage struct { - Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"` - IncludeOperationSha bool `yaml:"include_operation_sha" envDefault:"false" env:"INCLUDE_OPERATION_SHA"` - SampleRate float64 `yaml:"sample_rate" envDefault:"1.0" env:"SAMPLE_RATE"` + Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"` + IncludeOperationSha bool `yaml:"include_operation_sha" envDefault:"false" env:"INCLUDE_OPERATION_SHA"` + Exporter PrometheusSchemaFieldUsageExporter `yaml:"exporter" envPrefix:"EXPORTER_"` +} + +type PrometheusSchemaFieldUsageExporter struct { + BatchSize int `yaml:"batch_size" envDefault:"8192" env:"BATCH_SIZE"` + QueueSize int `yaml:"queue_size" envDefault:"16384" env:"QUEUE_SIZE"` + Interval time.Duration `yaml:"interval" envDefault:"10s" env:"INTERVAL"` + ExportTimeout time.Duration `yaml:"export_timeout" envDefault:"10s" env:"EXPORT_TIMEOUT"` } type MetricsOTLPExporter struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 68586eef69..555eeaea6d 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1246,12 +1246,39 @@ "default": false, "description": "Include the operation SHA256 in the metric labels, this can be an expensive operation. The default value is false." }, - "sample_rate": { - "type": "number", - "default": 1.0, - "minimum": 0.0, - "maximum": 1.0, - "description": "Sample rate for schema field usage metrics (0.0 to 1.0). Uses probabilistic random sampling, ensuring all operations get ~X% statistical coverage with uniform distribution. Only recommended for large scale deployments. Default is 1.0 (100% sampling)." + "exporter": { + "type": "object", + "description": "Configuration for the schema usage exporter", + "properties": { + "batch_size": { + "type": "integer", + "default": 8192, + "minimum": 1, + "description": "The maximum number of schema usage items to be applied in a single batch. The default value is 8192." + }, + "queue_size": { + "type": "integer", + "default": 16384, + "minimum": 1, + "description": "The maximum number of schema usage items allowed in queue at a given time. The default value is 16384." + }, + "interval": { + "type": "string", + "default": "10s", + "duration": { + "minimum": "1s" + }, + "description": "The interval at which the schema usage queue is flushed. The period is specified as a string with a number and a unit, e.g. 10s, 1m. The supported units are 's', 'm', 'h'." + }, + "export_timeout": { + "type": "string", + "default": "10s", + "duration": { + "minimum": "1s" + }, + "description": "The timeout for the schema usage export. The period is specified as a string with a number and a unit, e.g. 10s, 1m. The supported units are 's', 'm', 'h'." + } + } } } } diff --git a/router/pkg/metric/config.go b/router/pkg/metric/config.go index 46da130889..cfe41e4ddd 100644 --- a/router/pkg/metric/config.go +++ b/router/pkg/metric/config.go @@ -3,6 +3,7 @@ package metric import ( "net/url" "regexp" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/wundergraph/cosmo/router/pkg/config" @@ -41,10 +42,14 @@ type PrometheusConfig struct { type PrometheusSchemaFieldUsage struct { Enabled bool IncludeOperationSha bool - // SampleRate controls the percentage of requests to sample for schema field usage metrics (0.0 to 1.0). - // Uses probabilistic random sampling to ensure all operations get ~X% statistical coverage. - // Supports any rate: 1.0 (100%), 0.8 (80%), 0.5 (50%), 0.1 (10%), 0.01 (1%), etc. - SampleRate float64 + Exporter PrometheusSchemaFieldUsageExporter +} + +type PrometheusSchemaFieldUsageExporter struct { + BatchSize int + QueueSize int + Interval time.Duration + ExportTimeout time.Duration } type OpenTelemetryExporter struct { From e671808226cd617ea7ead895d4fba7d9d144eac1 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 00:08:47 +0100 Subject: [PATCH 02/13] chore: config tests --- router/pkg/config/config_test.go | 105 +++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index 079a2b1d79..9f447803b0 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -660,14 +660,12 @@ telemetry: schema_usage: enabled: true include_operation_sha: true - sample_rate: 0.5 # Supports any rate: 1.0, 0.8, 0.5, 0.1, 0.01, etc. `) c, err := LoadConfig([]string{f}) require.NoError(t, err) require.True(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Enabled) require.True(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.IncludeOperationSha) - require.Equal(t, 0.5, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.SampleRate) }) t.Run("from environment", func(t *testing.T) { @@ -679,16 +677,113 @@ version: "1" require.NoError(t, err) require.False(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Enabled) - require.Equal(t, 1.0, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.SampleRate) t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_ENABLED", "true") - t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_SAMPLE_RATE", "0.25") c, err = LoadConfig([]string{f}) require.NoError(t, err) require.True(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Enabled) - require.Equal(t, 0.25, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.SampleRate) + }) + + t.Run("exporter defaults", func(t *testing.T) { + f := createTempFileFromFixture(t, ` +version: "1" + +telemetry: + metrics: + prometheus: + schema_usage: + enabled: true +`) + c, err := LoadConfig([]string{f}) + require.NoError(t, err) + + exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter + require.Equal(t, 8192, exporter.BatchSize) + require.Equal(t, 16384, exporter.QueueSize) + require.Equal(t, 10*time.Second, exporter.Interval) + require.Equal(t, 10*time.Second, exporter.ExportTimeout) + }) + + t.Run("exporter custom config from file", func(t *testing.T) { + f := createTempFileFromFixture(t, ` +version: "1" + +telemetry: + metrics: + prometheus: + schema_usage: + enabled: true + include_operation_sha: true + exporter: + batch_size: 4096 + queue_size: 8192 + interval: 30s + export_timeout: 15s +`) + c, err := LoadConfig([]string{f}) + require.NoError(t, err) + + require.True(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Enabled) + require.True(t, c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.IncludeOperationSha) + + exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter + require.Equal(t, 4096, exporter.BatchSize) + require.Equal(t, 8192, exporter.QueueSize) + require.Equal(t, 30*time.Second, exporter.Interval) + require.Equal(t, 15*time.Second, exporter.ExportTimeout) + }) + + t.Run("exporter config from environment", func(t *testing.T) { + f := createTempFileFromFixture(t, ` +version: "1" + +telemetry: + metrics: + prometheus: + schema_usage: + enabled: true +`) + + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_BATCH_SIZE", "2048") + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_QUEUE_SIZE", "4096") + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_INTERVAL", "20s") + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_EXPORT_TIMEOUT", "25s") + + c, err := LoadConfig([]string{f}) + require.NoError(t, err) + + exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter + require.Equal(t, 2048, exporter.BatchSize) + require.Equal(t, 4096, exporter.QueueSize) + require.Equal(t, 20*time.Second, exporter.Interval) + require.Equal(t, 25*time.Second, exporter.ExportTimeout) + }) + + t.Run("file config takes precedence over environment", func(t *testing.T) { + f := createTempFileFromFixture(t, ` +version: "1" + +telemetry: + metrics: + prometheus: + schema_usage: + enabled: true + exporter: + batch_size: 1024 + interval: 5s +`) + + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_BATCH_SIZE", "9999") + t.Setenv("PROMETHEUS_SCHEMA_FIELD_USAGE_EXPORTER_INTERVAL", "99s") + + c, err := LoadConfig([]string{f}) + require.NoError(t, err) + + exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter + require.Equal(t, 1024, exporter.BatchSize) + require.Equal(t, 5*time.Second, exporter.Interval) }) } From c83aef739ac3a7816ee091324ee9b145f0713bc2 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 00:15:45 +0100 Subject: [PATCH 03/13] chore: fix lint --- router/core/router_config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/router/core/router_config.go b/router/core/router_config.go index 4e4b261f94..f1de53d1ca 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -35,7 +35,6 @@ type Config struct { otlpMeterProvider *sdkmetric.MeterProvider promMeterProvider *sdkmetric.MeterProvider gqlMetricsExporter *graphqlmetrics.GraphQLMetricsExporter - prometheusMetricsExporter *graphqlmetrics.PrometheusMetricsExporter corsOptions *cors.Config setConfigVersionHeader bool routerGracePeriod time.Duration From 69ba4b5d3ab8d8c22e0e1f8318db16d5205355f0 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 00:56:16 +0100 Subject: [PATCH 04/13] chore: fix lint --- router/pkg/config/testdata/config_defaults.json | 7 ++++++- router/pkg/config/testdata/config_full.json | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index d2691d2391..511a700efb 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -63,7 +63,12 @@ "SchemaFieldUsage": { "Enabled": false, "IncludeOperationSha": false, - "SampleRate": 1 + "Exporter": { + "BatchSize": 8192, + "QueueSize": 16384, + "Interval": 10000000000, + "ExportTimeout": 10000000000 + } } }, "CardinalityLimit": 2000 diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 07a6b88360..11bc2b1c1a 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -93,7 +93,12 @@ "SchemaFieldUsage": { "Enabled": true, "IncludeOperationSha": true, - "SampleRate": 1 + "Exporter": { + "BatchSize": 8192, + "QueueSize": 16384, + "Interval": 10000000000, + "ExportTimeout": 10000000000 + } } }, "CardinalityLimit": 2000 From 072fbd39ae5d765fff3fae6f1f70f6e2bb9a71f0 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 11:23:41 +0100 Subject: [PATCH 05/13] chore: reduce alloc --- router/internal/graphqlmetrics/prometheus_sink.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/router/internal/graphqlmetrics/prometheus_sink.go b/router/internal/graphqlmetrics/prometheus_sink.go index a76ea80435..4ad3770097 100644 --- a/router/internal/graphqlmetrics/prometheus_sink.go +++ b/router/internal/graphqlmetrics/prometheus_sink.go @@ -2,7 +2,6 @@ package graphqlmetrics import ( "context" - "slices" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" @@ -61,22 +60,24 @@ func (s *PrometheusSink) Export(ctx context.Context, batch []*graphqlmetrics.Sch // Record metrics for each unique combination of operation + field attributes for key, count := range aggregatedCounts { - opAttrs := []attribute.KeyValue{ + // Pre-allocate with max capacity (3 operation attrs + 2 field attrs) + allAttrs := make([]attribute.KeyValue, 0, 5) + + allAttrs = append(allAttrs, rotel.WgOperationName.String(key.operationName), rotel.WgOperationType.String(key.operationType), - } + ) // Include operation SHA256 if it was provided if key.operationHash != "" { - opAttrs = append(opAttrs, rotel.WgOperationSha256.String(key.operationHash)) + allAttrs = append(allAttrs, rotel.WgOperationSha256.String(key.operationHash)) } - fieldAttrs := []attribute.KeyValue{ + allAttrs = append(allAttrs, rotel.WgGraphQLFieldName.String(key.fieldName), rotel.WgGraphQLParentType.String(key.parentType), - } + ) - allAttrs := slices.Concat(opAttrs, fieldAttrs) s.metricStore.MeasureSchemaFieldUsage( ctx, int64(count), From cc907d06f26e1428a4060f425b230f9ec6991a74 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 11:42:47 +0100 Subject: [PATCH 06/13] chore: change defaults --- router/pkg/config/config.go | 4 ++-- router/pkg/config/config.schema.json | 8 ++++---- router/pkg/config/config_test.go | 8 ++++---- router/pkg/config/testdata/config_defaults.json | 4 ++-- router/pkg/config/testdata/config_full.json | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 7d80bec9c9..6461cc4ce4 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -119,8 +119,8 @@ type PrometheusSchemaFieldUsage struct { } type PrometheusSchemaFieldUsageExporter struct { - BatchSize int `yaml:"batch_size" envDefault:"8192" env:"BATCH_SIZE"` - QueueSize int `yaml:"queue_size" envDefault:"16384" env:"QUEUE_SIZE"` + BatchSize int `yaml:"batch_size" envDefault:"4096" env:"BATCH_SIZE"` + QueueSize int `yaml:"queue_size" envDefault:"12800" env:"QUEUE_SIZE"` Interval time.Duration `yaml:"interval" envDefault:"10s" env:"INTERVAL"` ExportTimeout time.Duration `yaml:"export_timeout" envDefault:"10s" env:"EXPORT_TIMEOUT"` } diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 555eeaea6d..cf20e88cdb 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1252,15 +1252,15 @@ "properties": { "batch_size": { "type": "integer", - "default": 8192, + "default": 4096, "minimum": 1, - "description": "The maximum number of schema usage items to be applied in a single batch. The default value is 8192." + "description": "The maximum number of schema usage items to be applied in a single batch. The default value is 4096." }, "queue_size": { "type": "integer", - "default": 16384, + "default": 12800, "minimum": 1, - "description": "The maximum number of schema usage items allowed in queue at a given time. The default value is 16384." + "description": "The maximum number of schema usage items allowed in queue at a given time. The default value is 12800." }, "interval": { "type": "string", diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index 9f447803b0..2f124963dd 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -700,8 +700,8 @@ telemetry: require.NoError(t, err) exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter - require.Equal(t, 8192, exporter.BatchSize) - require.Equal(t, 16384, exporter.QueueSize) + require.Equal(t, 4096, exporter.BatchSize) + require.Equal(t, 12800, exporter.QueueSize) require.Equal(t, 10*time.Second, exporter.Interval) require.Equal(t, 10*time.Second, exporter.ExportTimeout) }) @@ -718,7 +718,7 @@ telemetry: include_operation_sha: true exporter: batch_size: 4096 - queue_size: 8192 + queue_size: 12800 interval: 30s export_timeout: 15s `) @@ -730,7 +730,7 @@ telemetry: exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter require.Equal(t, 4096, exporter.BatchSize) - require.Equal(t, 8192, exporter.QueueSize) + require.Equal(t, 12800, exporter.QueueSize) require.Equal(t, 30*time.Second, exporter.Interval) require.Equal(t, 15*time.Second, exporter.ExportTimeout) }) diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 511a700efb..0966c82d2c 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -64,8 +64,8 @@ "Enabled": false, "IncludeOperationSha": false, "Exporter": { - "BatchSize": 8192, - "QueueSize": 16384, + "BatchSize": 4096, + "QueueSize": 12800, "Interval": 10000000000, "ExportTimeout": 10000000000 } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 11bc2b1c1a..8d68d2052e 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -94,8 +94,8 @@ "Enabled": true, "IncludeOperationSha": true, "Exporter": { - "BatchSize": 8192, - "QueueSize": 16384, + "BatchSize": 4096, + "QueueSize": 12800, "Interval": 10000000000, "ExportTimeout": 10000000000 } From 17dbcdecd7dda1d56b8e2673a2b3993e1a566ab4 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 12:00:28 +0100 Subject: [PATCH 07/13] chore: cache type field usage information --- router/core/context.go | 16 +++++++++++++--- router/core/router_metrics.go | 4 ++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/router/core/context.go b/router/core/context.go index 4cbde8706a..6065cfffc4 100644 --- a/router/core/context.go +++ b/router/core/context.go @@ -543,9 +543,10 @@ type operationContext struct { persistedOperationCacheHit bool normalizationCacheHit bool - typeFieldUsageInfo graphqlschemausage.TypeFieldMetrics - argumentUsageInfo []*graphqlmetrics.ArgumentUsageInfo - inputUsageInfo []*graphqlmetrics.InputUsageInfo + typeFieldUsageInfo graphqlschemausage.TypeFieldMetrics + typeFieldUsageInfoMetrics []*graphqlmetrics.TypeFieldUsageInfo // Cached conversion result + argumentUsageInfo []*graphqlmetrics.ArgumentUsageInfo + inputUsageInfo []*graphqlmetrics.InputUsageInfo parsingTime time.Duration validationTime time.Duration @@ -597,6 +598,15 @@ func (o *operationContext) Sha256Hash() string { return o.sha256Hash } +// GetTypeFieldUsageInfoMetrics returns the cached conversion of typeFieldUsageInfo. +// This avoids repeated allocations when multiple exporters need the same data. +func (o *operationContext) GetTypeFieldUsageInfoMetrics() []*graphqlmetrics.TypeFieldUsageInfo { + if o.typeFieldUsageInfoMetrics == nil && o.typeFieldUsageInfo != nil { + o.typeFieldUsageInfoMetrics = o.typeFieldUsageInfo.IntoGraphQLMetrics() + } + return o.typeFieldUsageInfoMetrics +} + type QueryPlanStats struct { TotalSubgraphFetches int SubgraphFetches map[string]int diff --git a/router/core/router_metrics.go b/router/core/router_metrics.go index d661fc8498..27aa89d645 100644 --- a/router/core/router_metrics.go +++ b/router/core/router_metrics.go @@ -106,7 +106,7 @@ func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext // which seems to be efficient in terms of memory usage and CPU item := &graphqlmetricsv1.SchemaUsageInfo{ RequestDocument: operationContext.content, - TypeFieldMetrics: operationContext.typeFieldUsageInfo.IntoGraphQLMetrics(), + TypeFieldMetrics: operationContext.GetTypeFieldUsageInfoMetrics(), ArgumentMetrics: operationContext.argumentUsageInfo, InputMetrics: operationContext.inputUsageInfo, OperationInfo: &graphqlmetricsv1.OperationInfo{ @@ -144,7 +144,7 @@ func (m *routerMetrics) ExportSchemaUsageInfoPrometheus(operationContext *operat } item := &graphqlmetricsv1.SchemaUsageInfo{ - TypeFieldMetrics: operationContext.typeFieldUsageInfo.IntoGraphQLMetrics(), + TypeFieldMetrics: operationContext.GetTypeFieldUsageInfoMetrics(), OperationInfo: &graphqlmetricsv1.OperationInfo{ Type: opType, Hash: operationContext.sha256Hash, From d0ce752cda9c09a7530ece4ed19da34eb7f1492c Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 13:51:53 +0100 Subject: [PATCH 08/13] chore: allow smaller intervals --- router/pkg/config/config.schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index cf20e88cdb..aaf9b703a1 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1266,7 +1266,7 @@ "type": "string", "default": "10s", "duration": { - "minimum": "1s" + "minimum": "100ms" }, "description": "The interval at which the schema usage queue is flushed. The period is specified as a string with a number and a unit, e.g. 10s, 1m. The supported units are 's', 'm', 'h'." }, From 23cb014ed4c7be2a8935b6e7bc078a5dcbadc5a0 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 18:47:52 +0100 Subject: [PATCH 09/13] chore: improve --- router/core/router_metrics.go | 23 +- router/internal/graphqlmetrics/exporter.go | 39 ++- .../graphqlmetrics/exporter_bench_test.go | 242 ++++++++++++++++++ router/pkg/config/config.go | 2 +- router/pkg/config/config.schema.json | 2 +- .../pkg/config/testdata/config_defaults.json | 2 +- router/pkg/config/testdata/config_full.json | 2 +- router/pkg/graphqlschemausage/schemausage.go | 23 +- .../schemausage_bench_test.go | 194 ++++++++++++++ 9 files changed, 511 insertions(+), 18 deletions(-) create mode 100644 router/internal/graphqlmetrics/exporter_bench_test.go create mode 100644 router/pkg/graphqlschemausage/schemausage_bench_test.go diff --git a/router/core/router_metrics.go b/router/core/router_metrics.go index 27aa89d645..37baf3ad1c 100644 --- a/router/core/router_metrics.go +++ b/router/core/router_metrics.go @@ -1,12 +1,13 @@ package core import ( + "strings" + "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" graphqlmetricsv1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" "github.com/wundergraph/cosmo/router/internal/graphqlmetrics" - "github.com/wundergraph/cosmo/router/internal/unsafebytes" "github.com/wundergraph/cosmo/router/pkg/metric" "go.uber.org/zap" ) @@ -85,6 +86,10 @@ func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext return } + if m.gqlMetricsExporter == nil { + return + } + var opType graphqlmetricsv1.OperationType switch operationContext.opType { case OperationTypeQuery: @@ -114,7 +119,7 @@ func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext Hash: operationContext.HashString(), // parsed operation names are re-used across requests // for that reason, we need to copy the name, or it might get corrupted - Name: m.strCopy(operationContext.name), + Name: strings.Clone(operationContext.name), }, SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: m.routerConfigVersion, @@ -133,6 +138,10 @@ func (m *routerMetrics) ExportSchemaUsageInfo(operationContext *operationContext } func (m *routerMetrics) ExportSchemaUsageInfoPrometheus(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) { + if m.prometheusMetricsExporter == nil { + return + } + var opType graphqlmetricsv1.OperationType switch operationContext.opType { case OperationTypeQuery: @@ -148,7 +157,9 @@ func (m *routerMetrics) ExportSchemaUsageInfoPrometheus(operationContext *operat OperationInfo: &graphqlmetricsv1.OperationInfo{ Type: opType, Hash: operationContext.sha256Hash, - Name: m.strCopy(operationContext.name), + // parsed operation names are re-used across requests + // for that reason, we need to copy the name, or it might get corrupted + Name: strings.Clone(operationContext.name), }, SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: m.routerConfigVersion, @@ -157,9 +168,3 @@ func (m *routerMetrics) ExportSchemaUsageInfoPrometheus(operationContext *operat m.prometheusMetricsExporter.RecordUsage(item, exportSynchronous) } - -func (m *routerMetrics) strCopy(s string) string { - b := make([]byte, len(s)) - copy(b, s) - return unsafebytes.BytesToString(b) -} diff --git a/router/internal/graphqlmetrics/exporter.go b/router/internal/graphqlmetrics/exporter.go index d276787af1..fc0a0be25a 100644 --- a/router/internal/graphqlmetrics/exporter.go +++ b/router/internal/graphqlmetrics/exporter.go @@ -3,6 +3,7 @@ package graphqlmetrics import ( "context" "fmt" + "sync" "time" "github.com/cloudflare/backoff" @@ -22,6 +23,7 @@ type Exporter[T any] struct { acceptTrafficSema chan struct{} queue chan T inflightBatches *atomic.Int64 + batchBufferPool *sync.Pool // Pool for batch slice buffers to reduce allocations // exportRequestContext is used to cancel all requests that started before the shutdown exportRequestContext context.Context @@ -102,6 +104,13 @@ func NewExporter[T any](logger *zap.Logger, sink Sink[T], isRetryableError SinkE inflightBatches: atomic.NewInt64(0), exportRequestContext: ctx, cancelAllExportRequests: cancel, + batchBufferPool: &sync.Pool{ + New: func() any { + // Pre-allocate slice with batch size capacity + buffer := make([]T, 0, settings.BatchSize) + return &buffer + }, + }, } if err := e.validate(); err != nil { return nil, err @@ -142,6 +151,23 @@ func (e *Exporter[T]) validate() error { return nil } +// getBatchBuffer retrieves a batch buffer from the pool. +// The returned buffer is ready to use with zero length and appropriate capacity. +func (e *Exporter[T]) getBatchBuffer() []T { + bufferPtr := e.batchBufferPool.Get().(*[]T) + buffer := *bufferPtr + // Ensure the buffer is empty (should already be, but be defensive) + return buffer[:0] +} + +// putBatchBuffer returns a batch buffer to the pool for reuse. +// The buffer is reset to zero length before being pooled. +func (e *Exporter[T]) putBatchBuffer(buffer []T) { + // Reset the slice to zero length while keeping capacity + buffer = buffer[:0] + e.batchBufferPool.Put(&buffer) +} + func (e *Exporter[T]) acceptTraffic() bool { // while the channel is not closed, the select will always return the default case // once it's closed, the select will always return _,false (closed channel) from the channel @@ -294,13 +320,16 @@ func (e *Exporter[T]) start() { for { if buffer == nil { - buffer = make([]T, 0, e.settings.BatchSize) + // Get a buffer from the pool instead of allocating + buffer = e.getBatchBuffer() } select { case <-ticker.C: e.logger.Debug("Tick: flushing buffer", zap.Int("buffer_size", len(buffer))) if len(buffer) > 0 { e.prepareAndSendBatch(buffer) + // Return buffer to pool after sending + e.putBatchBuffer(buffer) buffer = nil } case item := <-e.queue: @@ -308,6 +337,8 @@ func (e *Exporter[T]) start() { if len(buffer) == e.settings.BatchSize { e.logger.Debug("Buffer full, sending batch", zap.Int("batch_size", len(buffer))) e.prepareAndSendBatch(buffer) + // Return buffer to pool after sending + e.putBatchBuffer(buffer) buffer = nil } case <-e.shutdownSignal: @@ -329,11 +360,15 @@ func (e *Exporter[T]) drainQueue(buffer []T) { buffer = append(buffer, item) if len(buffer) == e.settings.BatchSize { e.prepareAndSendBatch(buffer) - buffer = make([]T, 0, e.settings.BatchSize) + // Return buffer to pool and get a new one + e.putBatchBuffer(buffer) + buffer = e.getBatchBuffer() } default: if len(buffer) > 0 { e.prepareAndSendBatch(buffer) + // Return buffer to pool before returning + e.putBatchBuffer(buffer) } e.logger.Debug("Queue drained", zap.Int("drained_items", drainedItems)) return diff --git a/router/internal/graphqlmetrics/exporter_bench_test.go b/router/internal/graphqlmetrics/exporter_bench_test.go new file mode 100644 index 0000000000..4583d4c18b --- /dev/null +++ b/router/internal/graphqlmetrics/exporter_bench_test.go @@ -0,0 +1,242 @@ +package graphqlmetrics + +import ( + "context" + "sync/atomic" + "testing" + "time" + + graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "go.uber.org/zap" +) + +// mockSink is a simple sink that does nothing, for benchmarking +type mockSink struct { + exportCount atomic.Int64 +} + +func (m *mockSink) Export(ctx context.Context, batch []*graphqlmetrics.SchemaUsageInfo) error { + m.exportCount.Add(1) + return nil +} + +func (m *mockSink) Close(ctx context.Context) error { + return nil +} + +// BenchmarkExporterBatchBufferAllocation measures allocations when creating and recycling batch buffers +func BenchmarkExporterBatchBufferAllocation(b *testing.B) { + logger := zap.NewNop() + sink := &mockSink{} + + settings := &ExporterSettings{ + BatchSize: 100, + QueueSize: 1000, + Interval: time.Second, + ExportTimeout: time.Second, + RetryOptions: RetryOptions{ + Enabled: false, + MaxRetry: 1, + MaxDuration: time.Second, + Interval: time.Second, + }, + } + + exporter, err := NewExporter(logger, sink, nil, settings) + if err != nil { + b.Fatal(err) + } + defer exporter.Shutdown(context.Background()) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Get a buffer from the pool + buffer := exporter.getBatchBuffer() + + // Simulate filling the buffer + for j := 0; j < 10; j++ { + buffer = append(buffer, &graphqlmetrics.SchemaUsageInfo{}) + } + + // Return buffer to pool + exporter.putBatchBuffer(buffer) + } +} + +// BenchmarkExporterHighThroughput simulates high-throughput usage +func BenchmarkExporterHighThroughput(b *testing.B) { + logger := zap.NewNop() + sink := &mockSink{} + + settings := &ExporterSettings{ + BatchSize: 100, + QueueSize: 10000, + Interval: 100 * time.Millisecond, + ExportTimeout: time.Second, + RetryOptions: RetryOptions{ + Enabled: false, + MaxRetry: 1, + MaxDuration: time.Second, + Interval: time.Second, + }, + } + + exporter, err := NewExporter(logger, sink, nil, settings) + if err != nil { + b.Fatal(err) + } + + item := &graphqlmetrics.SchemaUsageInfo{ + OperationInfo: &graphqlmetrics.OperationInfo{ + Hash: "test-hash", + Name: "TestOperation", + Type: graphqlmetrics.OperationType_QUERY, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + exporter.Record(item, false) + } + + // Shutdown to flush remaining items + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + exporter.Shutdown(ctx) +} + +// BenchmarkExporterBatchCycle measures a complete batch collection and flush cycle +func BenchmarkExporterBatchCycle(b *testing.B) { + logger := zap.NewNop() + sink := &mockSink{} + + settings := &ExporterSettings{ + BatchSize: 50, // Smaller batch for faster cycling + QueueSize: 1000, + Interval: time.Hour, // Don't flush by time + ExportTimeout: time.Second, + RetryOptions: RetryOptions{ + Enabled: false, + MaxRetry: 1, + MaxDuration: time.Second, + Interval: time.Second, + }, + } + + exporter, err := NewExporter(logger, sink, nil, settings) + if err != nil { + b.Fatal(err) + } + defer exporter.Shutdown(context.Background()) + + item := &graphqlmetrics.SchemaUsageInfo{ + OperationInfo: &graphqlmetrics.OperationInfo{ + Hash: "test-hash", + Name: "TestOperation", + Type: graphqlmetrics.OperationType_QUERY, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Fill exactly one batch + for j := 0; j < settings.BatchSize; j++ { + exporter.Record(item, false) + } + // Give time for batch to be processed + time.Sleep(time.Millisecond) + } +} + +// BenchmarkExporterBufferGrowth measures allocations when buffers grow beyond initial capacity +func BenchmarkExporterBufferGrowth(b *testing.B) { + logger := zap.NewNop() + sink := &mockSink{} + + settings := &ExporterSettings{ + BatchSize: 100, + QueueSize: 1000, + Interval: time.Second, + ExportTimeout: time.Second, + RetryOptions: RetryOptions{ + Enabled: false, + MaxRetry: 1, + MaxDuration: time.Second, + Interval: time.Second, + }, + } + + exporter, err := NewExporter(logger, sink, nil, settings) + if err != nil { + b.Fatal(err) + } + defer exporter.Shutdown(context.Background()) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Get a buffer from the pool + buffer := exporter.getBatchBuffer() + + // Fill to capacity (should not allocate) + for j := 0; j < settings.BatchSize; j++ { + buffer = append(buffer, &graphqlmetrics.SchemaUsageInfo{}) + } + + // Return buffer to pool + exporter.putBatchBuffer(buffer) + } +} + +// BenchmarkExporterParallelRecords measures concurrent record operations +func BenchmarkExporterParallelRecords(b *testing.B) { + logger := zap.NewNop() + sink := &mockSink{} + + settings := &ExporterSettings{ + BatchSize: 100, + QueueSize: 10000, + Interval: 100 * time.Millisecond, + ExportTimeout: time.Second, + RetryOptions: RetryOptions{ + Enabled: false, + MaxRetry: 1, + MaxDuration: time.Second, + Interval: time.Second, + }, + } + + exporter, err := NewExporter(logger, sink, nil, settings) + if err != nil { + b.Fatal(err) + } + + item := &graphqlmetrics.SchemaUsageInfo{ + OperationInfo: &graphqlmetrics.OperationInfo{ + Hash: "test-hash", + Name: "TestOperation", + Type: graphqlmetrics.OperationType_QUERY, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + exporter.Record(item, false) + } + }) + + // Shutdown to flush remaining items + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + exporter.Shutdown(ctx) +} diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 6461cc4ce4..3c9689814e 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -121,7 +121,7 @@ type PrometheusSchemaFieldUsage struct { type PrometheusSchemaFieldUsageExporter struct { BatchSize int `yaml:"batch_size" envDefault:"4096" env:"BATCH_SIZE"` QueueSize int `yaml:"queue_size" envDefault:"12800" env:"QUEUE_SIZE"` - Interval time.Duration `yaml:"interval" envDefault:"10s" env:"INTERVAL"` + Interval time.Duration `yaml:"interval" envDefault:"1s" env:"INTERVAL"` ExportTimeout time.Duration `yaml:"export_timeout" envDefault:"10s" env:"EXPORT_TIMEOUT"` } diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index aaf9b703a1..b83380afa9 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1264,7 +1264,7 @@ }, "interval": { "type": "string", - "default": "10s", + "default": "1s", "duration": { "minimum": "100ms" }, diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 0966c82d2c..e32f0d4540 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -66,7 +66,7 @@ "Exporter": { "BatchSize": 4096, "QueueSize": 12800, - "Interval": 10000000000, + "Interval": 1000000000, "ExportTimeout": 10000000000 } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 8d68d2052e..800a56a234 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -96,7 +96,7 @@ "Exporter": { "BatchSize": 4096, "QueueSize": 12800, - "Interval": 10000000000, + "Interval": 1000000000, "ExportTimeout": 10000000000 } } diff --git a/router/pkg/graphqlschemausage/schemausage.go b/router/pkg/graphqlschemausage/schemausage.go index 4ecfde37ec..a45045167b 100644 --- a/router/pkg/graphqlschemausage/schemausage.go +++ b/router/pkg/graphqlschemausage/schemausage.go @@ -1,8 +1,6 @@ package graphqlschemausage import ( - "slices" - "github.com/wundergraph/astjson" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor" @@ -66,11 +64,23 @@ type typeFieldUsageInfoVisitor struct { func (p *typeFieldUsageInfoVisitor) visitNode(node resolve.Node, path []string) { switch t := node.(type) { case *resolve.Object: + // Pre-allocate the typeFieldUsageInfo slice with a reasonable capacity + // to reduce allocations during traversal + if p.typeFieldUsageInfo == nil { + // Estimate: average query has ~20-50 fields + p.typeFieldUsageInfo = make([]*TypeFieldUsageInfo, 0, 32) + } + for _, field := range t.Fields { if field.Info == nil { continue } - pathCopy := slices.Clone(append(path, field.Info.Name)) + + // create a new slice with exact capacity and copy elements + pathCopy := make([]string, len(path)+1) + copy(pathCopy, path) + pathCopy[len(path)] = field.Info.Name + p.typeFieldUsageInfo = append(p.typeFieldUsageInfo, &TypeFieldUsageInfo{ Path: pathCopy, ParentTypeNames: field.Info.ParentTypeNames, @@ -100,6 +110,8 @@ func GetArgumentUsageInfo(operation, definition *ast.Document) ([]*graphqlmetric definition: definition, operation: operation, walker: &walker, + // Pre-allocate with reasonable capacity to reduce allocations + usage: make([]*graphqlmetrics.ArgumentUsageInfo, 0, 16), } walker.RegisterEnterArgumentVisitor(visitor) walker.RegisterEnterFieldVisitor(visitor) @@ -136,6 +148,7 @@ func (a *argumentUsageInfoVisitor) EnterArgument(ref int) { } argType := a.definition.InputValueDefinitionType(argDef) typeName := a.definition.ResolveTypeNameBytes(argType) + a.usage = append(a.usage, &graphqlmetrics.ArgumentUsageInfo{ Path: []string{string(fieldName), string(argName)}, TypeName: string(enclosingTypeName), @@ -148,6 +161,8 @@ func GetInputUsageInfo(operation, definition *ast.Document, variables *astjson.V operation: operation, definition: definition, variables: variables, + // Pre-allocate with reasonable capacity to reduce allocations + usage: make([]*graphqlmetrics.InputUsageInfo, 0, 16), } for i := range operation.VariableDefinitions { visitor.EnterVariableDefinition(i) @@ -182,6 +197,7 @@ func (v *inputUsageInfoVisitor) traverseVariable(jsonValue *astjson.Value, field } if parentTypeName != "" { usageInfo.TypeName = parentTypeName + // Pre-allocate Path slice with exact capacity usageInfo.Path = []string{parentTypeName, fieldName} } @@ -216,6 +232,7 @@ func (v *inputUsageInfoVisitor) traverseVariable(jsonValue *astjson.Value, field usageInfo.EnumValues = []string{string(jsonValue.GetStringBytes())} case astjson.TypeArray: arr := jsonValue.GetArray() + // Pre-allocate EnumValues slice with exact capacity usageInfo.EnumValues = make([]string, len(arr)) for i, arrayValue := range arr { usageInfo.EnumValues[i] = string(arrayValue.GetStringBytes()) diff --git a/router/pkg/graphqlschemausage/schemausage_bench_test.go b/router/pkg/graphqlschemausage/schemausage_bench_test.go new file mode 100644 index 0000000000..b2d300266c --- /dev/null +++ b/router/pkg/graphqlschemausage/schemausage_bench_test.go @@ -0,0 +1,194 @@ +package graphqlschemausage + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/astjson" + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/astnormalization" + "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser" + "github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform" + "github.com/wundergraph/graphql-go-tools/v2/pkg/astvalidation" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" + "github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport" +) + +// setupBenchmark creates a realistic schema usage scenario for benchmarking +// Returns: plan, operation doc, definition doc, variables +func setupBenchmark(b *testing.B) (plan.Plan, *ast.Document, *ast.Document, *astjson.Value) { + b.Helper() + + operation := ` + query Search($name: String! $filter2: SearchFilter $enumValue: Episode $enumList: [Episode]) { + searchResults(name: $name, filter: {excludeName: "Jannik"} filter2: $filter2, enumValue: $enumValue enumList: $enumList) { + __typename + ... on Human { + name + inlineName(name: "Jannik") + } + ... on Droid { + name + } + } + hero { + name + } + } + ` + + variables := `{"name":"Jannik","filter2":{"enumField":"NEWHOPE"},"enumValue":"EMPIRE","enumList":["JEDI","EMPIRE"]}` + + // Parse schema + def, rep := astparser.ParseGraphqlDocumentString(schemaUsageInfoTestSchema) + require.False(b, rep.HasErrors()) + + // Parse operation + op, rep := astparser.ParseGraphqlDocumentString(operation) + require.False(b, rep.HasErrors()) + + // Merge and normalize + err := asttransform.MergeDefinitionWithBaseSchema(&def) + require.NoError(b, err) + + report := &operationreport.Report{} + norm := astnormalization.NewNormalizer(true, true) + norm.NormalizeOperation(&op, &def, report) + require.False(b, report.HasErrors()) + + valid := astvalidation.DefaultOperationValidator() + valid.Validate(&op, &def, report) + require.False(b, report.HasErrors()) + + // Create data source configuration + dsCfg, err := plan.NewDataSourceConfiguration[any]( + "https://swapi.dev/api", + &FakeFactory[any]{upstreamSchema: &def}, + &plan.DataSourceMetadata{ + RootNodes: []plan.TypeField{ + {TypeName: "Query", FieldNames: []string{"searchResults", "hero"}}, + }, + ChildNodes: []plan.TypeField{ + {TypeName: "Human", FieldNames: []string{"name", "inlineName"}}, + {TypeName: "Droid", FieldNames: []string{"name"}}, + {TypeName: "SearchResult", FieldNames: []string{"__typename"}}, + {TypeName: "Character", FieldNames: []string{"name", "friends"}}, + }, + }, + nil, + ) + require.NoError(b, err) + + // Create planner + planner, err := plan.NewPlanner(plan.Configuration{ + DisableResolveFieldPositions: true, + DataSources: []plan.DataSource{dsCfg}, + }) + require.NoError(b, err) + + // Generate plan + generatedPlan := planner.Plan(&op, &def, "Search", report) + require.False(b, report.HasErrors()) + + // Parse variables + vars, err := astjson.Parse(variables) + require.NoError(b, err) + + inputVariables, err := astjson.ParseBytes(op.Input.Variables) + require.NoError(b, err) + + merged, _, err := astjson.MergeValues(vars, inputVariables) + require.NoError(b, err) + + return generatedPlan, &op, &def, merged +} + +// BenchmarkGetTypeFieldUsageInfo measures memory allocations when extracting field usage from a plan +func BenchmarkGetTypeFieldUsageInfo(b *testing.B) { + generatedPlan, _, _, _ := setupBenchmark(b) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + result := GetTypeFieldUsageInfo(generatedPlan) + _ = result // Prevent compiler optimization + } +} + +// BenchmarkGetArgumentUsageInfo measures memory allocations when extracting argument usage +func BenchmarkGetArgumentUsageInfo(b *testing.B) { + _, operation, definition, _ := setupBenchmark(b) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + result, err := GetArgumentUsageInfo(operation, definition) + if err != nil { + b.Fatal(err) + } + _ = result // Prevent compiler optimization + } +} + +// BenchmarkGetInputUsageInfo measures memory allocations when extracting input variable usage +func BenchmarkGetInputUsageInfo(b *testing.B) { + _, operation, definition, variables := setupBenchmark(b) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + result, err := GetInputUsageInfo(operation, definition, variables) + if err != nil { + b.Fatal(err) + } + _ = result // Prevent compiler optimization + } +} + +// BenchmarkIntoGraphQLMetrics measures memory allocations when converting to protobuf format +func BenchmarkIntoGraphQLMetrics(b *testing.B) { + generatedPlan, _, _, _ := setupBenchmark(b) + typeFieldMetrics := TypeFieldMetrics(GetTypeFieldUsageInfo(generatedPlan)) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + result := typeFieldMetrics.IntoGraphQLMetrics() + _ = result // Prevent compiler optimization + } +} + +// BenchmarkSchemaUsageEndToEnd measures total memory allocations for complete schema usage extraction +// This simulates a full request lifecycle for schema usage tracking +func BenchmarkSchemaUsageEndToEnd(b *testing.B) { + generatedPlan, operation, definition, variables := setupBenchmark(b) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Extract type field usage + typeFieldUsage := GetTypeFieldUsageInfo(generatedPlan) + + // Convert to GraphQL metrics format + _ = TypeFieldMetrics(typeFieldUsage).IntoGraphQLMetrics() + + // Extract argument usage + argUsage, err := GetArgumentUsageInfo(operation, definition) + if err != nil { + b.Fatal(err) + } + _ = argUsage + + // Extract input variable usage + inputUsage, err := GetInputUsageInfo(operation, definition, variables) + if err != nil { + b.Fatal(err) + } + _ = inputUsage + } +} From d05e9e3e8e698fae4d0133f26f0f57a8b251b141 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Fri, 14 Nov 2025 20:31:33 +0100 Subject: [PATCH 10/13] chore: improve --- router/demo.config.yaml | 20 ++++++++++++++++++- router/internal/graphqlmetrics/exporter.go | 14 ++++++------- .../internal/graphqlmetrics/exporter_test.go | 16 +++++++++++++++ .../graphqlmetrics/prometheus_sink.go | 2 +- router/pkg/config/config.go | 2 +- router/pkg/config/config.schema.json | 2 +- router/pkg/config/config_test.go | 2 +- .../pkg/config/testdata/config_defaults.json | 2 +- router/pkg/config/testdata/config_full.json | 2 +- router/pkg/graphqlschemausage/schemausage.go | 1 + 10 files changed, 48 insertions(+), 15 deletions(-) diff --git a/router/demo.config.yaml b/router/demo.config.yaml index 9a72e31de2..cfb368468e 100644 --- a/router/demo.config.yaml +++ b/router/demo.config.yaml @@ -5,6 +5,24 @@ version: "1" +graphql_metrics: + enabled: false + +traffic_shaping: + all: + retry: + enabled: false + +telemetry: + metrics: + otlp: + enabled: false + prometheus: + schema_usage: + enabled: true + exporter: + interval: 3s + events: providers: nats: @@ -19,4 +37,4 @@ events: redis: - id: my-redis urls: - - "redis://localhost:6379/2" \ No newline at end of file + - "redis://localhost:6379/2" \ No newline at end of file diff --git a/router/internal/graphqlmetrics/exporter.go b/router/internal/graphqlmetrics/exporter.go index fc0a0be25a..1467b052c5 100644 --- a/router/internal/graphqlmetrics/exporter.go +++ b/router/internal/graphqlmetrics/exporter.go @@ -228,11 +228,13 @@ func (e *Exporter[T]) exportBatch(batch []T) error { } // prepareAndSendBatch starts a goroutine to export the batch with retry logic. +// The goroutine takes ownership of the batch slice and will return it to the pool when done. func (e *Exporter[T]) prepareAndSendBatch(batch []T) { e.logger.Debug("Preparing to send batch", zap.Int("batch_size", len(batch))) e.inflightBatches.Inc() go func() { defer e.inflightBatches.Dec() + defer e.putBatchBuffer(batch) // Return buffer to pool after export completes e.exportBatchWithRetry(batch) }() } @@ -328,8 +330,7 @@ func (e *Exporter[T]) start() { e.logger.Debug("Tick: flushing buffer", zap.Int("buffer_size", len(buffer))) if len(buffer) > 0 { e.prepareAndSendBatch(buffer) - // Return buffer to pool after sending - e.putBatchBuffer(buffer) + // Ownership transferred to goroutine, get a new buffer buffer = nil } case item := <-e.queue: @@ -337,8 +338,7 @@ func (e *Exporter[T]) start() { if len(buffer) == e.settings.BatchSize { e.logger.Debug("Buffer full, sending batch", zap.Int("batch_size", len(buffer))) e.prepareAndSendBatch(buffer) - // Return buffer to pool after sending - e.putBatchBuffer(buffer) + // Ownership transferred to goroutine, get a new buffer buffer = nil } case <-e.shutdownSignal: @@ -360,15 +360,13 @@ func (e *Exporter[T]) drainQueue(buffer []T) { buffer = append(buffer, item) if len(buffer) == e.settings.BatchSize { e.prepareAndSendBatch(buffer) - // Return buffer to pool and get a new one - e.putBatchBuffer(buffer) + // Ownership transferred to goroutine, get a new buffer buffer = e.getBatchBuffer() } default: if len(buffer) > 0 { e.prepareAndSendBatch(buffer) - // Return buffer to pool before returning - e.putBatchBuffer(buffer) + // Ownership transferred to goroutine } e.logger.Debug("Queue drained", zap.Int("drained_items", drainedItems)) return diff --git a/router/internal/graphqlmetrics/exporter_test.go b/router/internal/graphqlmetrics/exporter_test.go index 85545914d9..a7a6420542 100644 --- a/router/internal/graphqlmetrics/exporter_test.go +++ b/router/internal/graphqlmetrics/exporter_test.go @@ -175,6 +175,10 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) { SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: "1", }, + RequestInfo: &graphqlmetricsv1.RequestInfo{ + Error: false, + StatusCode: http.StatusOK, + }, Attributes: map[string]string{}, } @@ -246,6 +250,10 @@ func TestForceFlushSync(t *testing.T) { SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: "1", }, + RequestInfo: &graphqlmetricsv1.RequestInfo{ + Error: false, + StatusCode: http.StatusOK, + }, Attributes: map[string]string{}, } @@ -291,6 +299,10 @@ func TestForceFlushSync(t *testing.T) { SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: "1", }, + RequestInfo: &graphqlmetricsv1.RequestInfo{ + Error: false, + StatusCode: http.StatusOK, + }, Attributes: map[string]string{}, } @@ -360,6 +372,10 @@ func TestExportBatchInterval(t *testing.T) { SchemaInfo: &graphqlmetricsv1.SchemaInfo{ Version: "1", }, + RequestInfo: &graphqlmetricsv1.RequestInfo{ + Error: false, + StatusCode: http.StatusOK, + }, Attributes: map[string]string{}, } diff --git a/router/internal/graphqlmetrics/prometheus_sink.go b/router/internal/graphqlmetrics/prometheus_sink.go index 4ad3770097..47d1514592 100644 --- a/router/internal/graphqlmetrics/prometheus_sink.go +++ b/router/internal/graphqlmetrics/prometheus_sink.go @@ -82,7 +82,7 @@ func (s *PrometheusSink) Export(ctx context.Context, batch []*graphqlmetrics.Sch ctx, int64(count), []attribute.KeyValue{}, - otelmetric.WithAttributeSet(attribute.NewSet(allAttrs...)), + otelmetric.WithAttributes(allAttrs...), ) } diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 3c9689814e..266f56bfc5 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -121,7 +121,7 @@ type PrometheusSchemaFieldUsage struct { type PrometheusSchemaFieldUsageExporter struct { BatchSize int `yaml:"batch_size" envDefault:"4096" env:"BATCH_SIZE"` QueueSize int `yaml:"queue_size" envDefault:"12800" env:"QUEUE_SIZE"` - Interval time.Duration `yaml:"interval" envDefault:"1s" env:"INTERVAL"` + Interval time.Duration `yaml:"interval" envDefault:"2s" env:"INTERVAL"` ExportTimeout time.Duration `yaml:"export_timeout" envDefault:"10s" env:"EXPORT_TIMEOUT"` } diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index b83380afa9..67dee2f1bb 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1264,7 +1264,7 @@ }, "interval": { "type": "string", - "default": "1s", + "default": "2s", "duration": { "minimum": "100ms" }, diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index 2f124963dd..c5c3eab45e 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -702,7 +702,7 @@ telemetry: exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter require.Equal(t, 4096, exporter.BatchSize) require.Equal(t, 12800, exporter.QueueSize) - require.Equal(t, 10*time.Second, exporter.Interval) + require.Equal(t, 2*time.Second, exporter.Interval) require.Equal(t, 10*time.Second, exporter.ExportTimeout) }) diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index e32f0d4540..e780d2da35 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -66,7 +66,7 @@ "Exporter": { "BatchSize": 4096, "QueueSize": 12800, - "Interval": 1000000000, + "Interval": 2000000000, "ExportTimeout": 10000000000 } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 800a56a234..9a2de6c6b8 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -96,7 +96,7 @@ "Exporter": { "BatchSize": 4096, "QueueSize": 12800, - "Interval": 1000000000, + "Interval": 2000000000, "ExportTimeout": 10000000000 } } diff --git a/router/pkg/graphqlschemausage/schemausage.go b/router/pkg/graphqlschemausage/schemausage.go index a45045167b..bfd6acec87 100644 --- a/router/pkg/graphqlschemausage/schemausage.go +++ b/router/pkg/graphqlschemausage/schemausage.go @@ -27,6 +27,7 @@ type TypeFieldMetrics []*TypeFieldUsageInfo // IntoGraphQLMetrics converts the TypeFieldMetrics into a []*graphqlmetrics.TypeFieldUsageInfo func (t TypeFieldMetrics) IntoGraphQLMetrics() []*graphqlmetrics.TypeFieldUsageInfo { + // Pre-allocate slice with exact capacity metrics := make([]*graphqlmetrics.TypeFieldUsageInfo, len(t)) for i, info := range t { metrics[i] = info.IntoGraphQLMetrics() From 4933a803605f70863038017b6ff6edead447e05d Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 20 Nov 2025 14:53:03 +0000 Subject: [PATCH 11/13] chore: `PrometheusUsageInfoTrack` -> `PrometheusTrackUsageInfo` --- router/core/operation_metrics.go | 8 ++++---- router/core/router_metrics.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/router/core/operation_metrics.go b/router/core/operation_metrics.go index 4f26530c5e..1cf9683f2d 100644 --- a/router/core/operation_metrics.go +++ b/router/core/operation_metrics.go @@ -36,7 +36,7 @@ type OperationMetrics struct { routerConfigVersion string logger *zap.Logger trackUsageInfo bool - prometheusUsageInfoTrack bool + prometheusTrackUsageInfo bool } func (m *OperationMetrics) Finish(reqContext *requestContext, statusCode int, responseSize int, exportSynchronous bool) { @@ -82,7 +82,7 @@ func (m *OperationMetrics) Finish(reqContext *requestContext, statusCode int, re } // Prometheus metrics export (to local Prometheus metrics) - if m.prometheusUsageInfoTrack { + if m.prometheusTrackUsageInfo { m.routerMetrics.ExportSchemaUsageInfoPrometheus(reqContext.operation, statusCode, reqContext.error != nil, exportSynchronous) } } @@ -96,7 +96,7 @@ type OperationMetricsOptions struct { RouterMetrics RouterMetrics Logger *zap.Logger TrackUsageInfo bool - PrometheusUsageInfoTrack bool + PrometheusTrackUsageInfo bool } // newOperationMetrics creates a new OperationMetrics struct and starts the operation metrics. @@ -113,6 +113,6 @@ func newOperationMetrics(opts OperationMetricsOptions) *OperationMetrics { routerMetrics: opts.RouterMetrics, logger: opts.Logger, trackUsageInfo: opts.TrackUsageInfo, - prometheusUsageInfoTrack: opts.PrometheusUsageInfoTrack, + prometheusTrackUsageInfo: opts.PrometheusTrackUsageInfo, } } diff --git a/router/core/router_metrics.go b/router/core/router_metrics.go index 37baf3ad1c..b4aade37e7 100644 --- a/router/core/router_metrics.go +++ b/router/core/router_metrics.go @@ -62,7 +62,7 @@ func (m *routerMetrics) StartOperation(logger *zap.Logger, requestContentLength RequestContentLength: requestContentLength, RouterConfigVersion: m.routerConfigVersion, TrackUsageInfo: m.exportEnabled, - PrometheusUsageInfoTrack: m.prometheusMetricsExporter != nil, + PrometheusTrackUsageInfo: m.prometheusMetricsExporter != nil, InFlightAddOption: inFlightAddOption, SliceAttributes: sliceAttr, }) From 307df0cf4ecba58650cda72a25c9b4ecbab81661 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 20 Nov 2025 14:56:14 +0000 Subject: [PATCH 12/13] chore: rename GqlMetricsExporter -> GQLMetricsExporter --- router/core/router_metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/router/core/router_metrics.go b/router/core/router_metrics.go index b4aade37e7..5cb9022ef6 100644 --- a/router/core/router_metrics.go +++ b/router/core/router_metrics.go @@ -16,7 +16,7 @@ type RouterMetrics interface { StartOperation(logger *zap.Logger, requestContentLength int64, sliceAttr []attribute.KeyValue, inFlightAddOption otelmetric.AddOption) *OperationMetrics ExportSchemaUsageInfo(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) ExportSchemaUsageInfoPrometheus(operationContext *operationContext, statusCode int, hasError bool, exportSynchronous bool) - GqlMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter + GQLMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter PrometheusMetricsExporter() *graphqlmetrics.PrometheusMetricsExporter MetricStore() metric.Store } @@ -73,7 +73,7 @@ func (m *routerMetrics) MetricStore() metric.Store { return m.metrics } -func (m *routerMetrics) GqlMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter { +func (m *routerMetrics) GQLMetricsExporter() *graphqlmetrics.GraphQLMetricsExporter { return m.gqlMetricsExporter } From 8e24196a4c608e88dda72191bebb776558b317b6 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 20 Nov 2025 15:14:20 +0000 Subject: [PATCH 13/13] chore: modernize, move generic exporter bits out of graphqlmetrics --- router/core/graph_server.go | 5 +- router/core/router.go | 3 +- .../{graphqlmetrics => exporter}/exporter.go | 2 +- .../exporter_bench_test.go | 14 ++--- .../{graphqlmetrics => exporter}/sink.go | 2 +- .../graphqlmetrics/graphql_exporter.go | 7 +-- ...orter_test.go => graphql_exporter_test.go} | 51 ++++++++++--------- .../graphqlmetrics/prometheus_exporter.go | 7 +-- 8 files changed, 46 insertions(+), 45 deletions(-) rename router/internal/{graphqlmetrics => exporter}/exporter.go (99%) rename router/internal/{graphqlmetrics => exporter}/exporter_bench_test.go (96%) rename router/internal/{graphqlmetrics => exporter}/sink.go (97%) rename router/internal/graphqlmetrics/{exporter_test.go => graphql_exporter_test.go} (92%) diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 7ace34983f..f2aa8ec061 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -35,6 +35,7 @@ import ( "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/common" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/internal/circuit" + "github.com/wundergraph/cosmo/router/internal/exporter" "github.com/wundergraph/cosmo/router/internal/expr" "github.com/wundergraph/cosmo/router/internal/graphqlmetrics" rjwt "github.com/wundergraph/cosmo/router/internal/jwt" @@ -923,12 +924,12 @@ func (s *graphServer) buildGraphMux( // Create Prometheus metrics exporter for schema field usage if enabled if s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled { cfg := s.metricConfig.Prometheus.PromSchemaFieldUsage - settings := &graphqlmetrics.ExporterSettings{ + settings := &exporter.ExporterSettings{ BatchSize: cfg.Exporter.BatchSize, QueueSize: cfg.Exporter.QueueSize, Interval: cfg.Exporter.Interval, ExportTimeout: cfg.Exporter.ExportTimeout, - RetryOptions: graphqlmetrics.RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, // Retry is disabled for Prometheus metrics MaxRetry: 1, // Provide valid defaults even when disabled MaxDuration: time.Second * 1, diff --git a/router/core/router.go b/router/core/router.go index 8247ed3b56..07ca5945d8 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -30,6 +30,7 @@ import ( "github.com/wundergraph/cosmo/router/internal/circuit" "github.com/wundergraph/cosmo/router/internal/debug" "github.com/wundergraph/cosmo/router/internal/docker" + "github.com/wundergraph/cosmo/router/internal/exporter" "github.com/wundergraph/cosmo/router/internal/graphiql" "github.com/wundergraph/cosmo/router/internal/graphqlmetrics" "github.com/wundergraph/cosmo/router/internal/persistedoperation" @@ -828,7 +829,7 @@ func (r *Router) bootstrap(ctx context.Context) error { r.logger, client, r.graphApiToken, - graphqlmetrics.NewDefaultExporterSettings(), + exporter.NewDefaultExporterSettings(), ) if err != nil { return fmt.Errorf("failed to validate graphql metrics exporter: %w", err) diff --git a/router/internal/graphqlmetrics/exporter.go b/router/internal/exporter/exporter.go similarity index 99% rename from router/internal/graphqlmetrics/exporter.go rename to router/internal/exporter/exporter.go index 1467b052c5..1eab9ca898 100644 --- a/router/internal/graphqlmetrics/exporter.go +++ b/router/internal/exporter/exporter.go @@ -1,4 +1,4 @@ -package graphqlmetrics +package exporter import ( "context" diff --git a/router/internal/graphqlmetrics/exporter_bench_test.go b/router/internal/exporter/exporter_bench_test.go similarity index 96% rename from router/internal/graphqlmetrics/exporter_bench_test.go rename to router/internal/exporter/exporter_bench_test.go index 4583d4c18b..2fd7aca302 100644 --- a/router/internal/graphqlmetrics/exporter_bench_test.go +++ b/router/internal/exporter/exporter_bench_test.go @@ -1,4 +1,4 @@ -package graphqlmetrics +package exporter import ( "context" @@ -48,10 +48,9 @@ func BenchmarkExporterBatchBufferAllocation(b *testing.B) { } defer exporter.Shutdown(context.Background()) - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // Get a buffer from the pool buffer := exporter.getBatchBuffer() @@ -96,10 +95,9 @@ func BenchmarkExporterHighThroughput(b *testing.B) { }, } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { exporter.Record(item, false) } @@ -141,10 +139,9 @@ func BenchmarkExporterBatchCycle(b *testing.B) { }, } - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // Fill exactly one batch for j := 0; j < settings.BatchSize; j++ { exporter.Record(item, false) @@ -178,10 +175,9 @@ func BenchmarkExporterBufferGrowth(b *testing.B) { } defer exporter.Shutdown(context.Background()) - b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { // Get a buffer from the pool buffer := exporter.getBatchBuffer() diff --git a/router/internal/graphqlmetrics/sink.go b/router/internal/exporter/sink.go similarity index 97% rename from router/internal/graphqlmetrics/sink.go rename to router/internal/exporter/sink.go index 45979fb117..0d98b8fbf4 100644 --- a/router/internal/graphqlmetrics/sink.go +++ b/router/internal/exporter/sink.go @@ -1,4 +1,4 @@ -package graphqlmetrics +package exporter import ( "context" diff --git a/router/internal/graphqlmetrics/graphql_exporter.go b/router/internal/graphqlmetrics/graphql_exporter.go index 1d4ab01835..8b261aa843 100644 --- a/router/internal/graphqlmetrics/graphql_exporter.go +++ b/router/internal/graphqlmetrics/graphql_exporter.go @@ -5,13 +5,14 @@ import ( graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" + "github.com/wundergraph/cosmo/router/internal/exporter" "go.uber.org/zap" ) // GraphQLMetricsExporter wraps the generic Exporter for GraphQL metrics. // It provides a cleaner API and backward compatibility with the old interface. type GraphQLMetricsExporter struct { - exporter *Exporter[*graphqlmetrics.SchemaUsageInfo] + exporter *exporter.Exporter[*graphqlmetrics.SchemaUsageInfo] } // NewGraphQLMetricsExporter creates a new exporter specifically for GraphQL metrics. @@ -20,7 +21,7 @@ func NewGraphQLMetricsExporter( logger *zap.Logger, client graphqlmetricsv1connect.GraphQLMetricsServiceClient, apiToken string, - settings *ExporterSettings, + settings *exporter.ExporterSettings, ) (*GraphQLMetricsExporter, error) { sink := NewGraphQLMetricsSink(GraphQLMetricsSinkConfig{ Client: client, @@ -28,7 +29,7 @@ func NewGraphQLMetricsExporter( Logger: logger, }) - exporter, err := NewExporter(logger, sink, IsRetryableError, settings) + exporter, err := exporter.NewExporter(logger, sink, IsRetryableError, settings) if err != nil { return nil, err } diff --git a/router/internal/graphqlmetrics/exporter_test.go b/router/internal/graphqlmetrics/graphql_exporter_test.go similarity index 92% rename from router/internal/graphqlmetrics/exporter_test.go rename to router/internal/graphqlmetrics/graphql_exporter_test.go index a7a6420542..9339f2ffc8 100644 --- a/router/internal/graphqlmetrics/exporter_test.go +++ b/router/internal/graphqlmetrics/graphql_exporter_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" graphqlmetricsv1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" + "github.com/wundergraph/cosmo/router/internal/exporter" "go.uber.org/zap" ) @@ -53,11 +54,11 @@ func TestExportAggregationSameSchemaUsages(t *testing.T) { zap.NewNop(), c, "secret", - &ExporterSettings{ + &exporter.ExporterSettings{ BatchSize: batchSize, QueueSize: queueSize, Interval: 500 * time.Millisecond, - RetryOptions: RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, MaxDuration: 300 * time.Millisecond, Interval: 100 * time.Millisecond, @@ -69,7 +70,7 @@ func TestExportAggregationSameSchemaUsages(t *testing.T) { require.Nil(t, err) - for i := 0; i < totalItems; i++ { + for i := range totalItems { hash := fmt.Sprintf("hash-%d", i%2) @@ -113,8 +114,10 @@ func TestExportAggregationSameSchemaUsages(t *testing.T) { c.mu.Lock() defer c.mu.Unlock() - require.Equal(t, 1, len(c.publishedAggregations)) - require.Equal(t, 2, len(c.publishedAggregations[0])) + + require.Len(t, c.publishedAggregations, 1) + require.Len(t, c.publishedAggregations[0], 2) + require.Equal(t, 50, int(c.publishedAggregations[0][0].RequestCount)) require.Equal(t, 50, int(c.publishedAggregations[0][1].RequestCount)) } @@ -132,11 +135,11 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) { zap.NewNop(), c, "secret", - &ExporterSettings{ + &exporter.ExporterSettings{ BatchSize: batchSize, QueueSize: queueSize, Interval: time.Second * 5, - RetryOptions: RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, MaxDuration: 300 * time.Millisecond, Interval: 100 * time.Millisecond, @@ -148,8 +151,7 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) { require.Nil(t, err) - for i := 0; i < totalItems; i++ { - i := i + for i := range totalItems { usage := &graphqlmetricsv1.SchemaUsageInfo{ TypeFieldMetrics: []*graphqlmetricsv1.TypeFieldUsageInfo{ { @@ -204,13 +206,13 @@ func TestForceFlushSync(t *testing.T) { zap.NewNop(), c, "secret", - &ExporterSettings{ + &exporter.ExporterSettings{ BatchSize: batchSize, QueueSize: queueSize, // Intentionally set to a high value to make sure that the exporter is forced to flush immediately Interval: 5000 * time.Millisecond, ExportTimeout: 5000 * time.Millisecond, - RetryOptions: RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, MaxDuration: 300 * time.Millisecond, Interval: 100 * time.Millisecond, @@ -221,8 +223,7 @@ func TestForceFlushSync(t *testing.T) { require.Nil(t, err) - for i := 0; i < totalItems; i++ { - i := i + for i := range totalItems { usage := &graphqlmetricsv1.SchemaUsageInfo{ TypeFieldMetrics: []*graphqlmetricsv1.TypeFieldUsageInfo{ { @@ -262,8 +263,8 @@ func TestForceFlushSync(t *testing.T) { c.mu.Lock() // Synchronous mode now sends items through aggregation, so they appear as aggregations - require.Equal(t, 10, len(c.publishedAggregations)) - require.Equal(t, 1, len(c.publishedAggregations[0])) + require.Len(t, c.publishedAggregations, 10) + require.Len(t, c.publishedAggregations[0], 1) // Make sure that the exporter is still working after a forced flush @@ -271,7 +272,7 @@ func TestForceFlushSync(t *testing.T) { c.publishedAggregations = c.publishedAggregations[:0] c.mu.Unlock() - for i := 0; i < totalItems; i++ { + for i := range totalItems { usage := &graphqlmetricsv1.SchemaUsageInfo{ TypeFieldMetrics: []*graphqlmetricsv1.TypeFieldUsageInfo{ { @@ -311,8 +312,8 @@ func TestForceFlushSync(t *testing.T) { c.mu.Lock() defer c.mu.Unlock() - require.Equal(t, 10, len(c.publishedAggregations)) - require.Equal(t, 1, len(c.publishedAggregations[0])) + require.Len(t, c.publishedAggregations, 10) + require.Len(t, c.publishedAggregations[0], 1) } func TestExportBatchInterval(t *testing.T) { @@ -328,11 +329,11 @@ func TestExportBatchInterval(t *testing.T) { zap.NewNop(), c, "secret", - &ExporterSettings{ + &exporter.ExporterSettings{ BatchSize: batchSize, QueueSize: queueSize, Interval: 100 * time.Millisecond, - RetryOptions: RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, MaxDuration: 300 * time.Millisecond, Interval: 100 * time.Millisecond, @@ -344,7 +345,7 @@ func TestExportBatchInterval(t *testing.T) { require.Nil(t, err) - for i := 0; i < totalItems; i++ { + for i := range totalItems { usage := &graphqlmetricsv1.SchemaUsageInfo{ TypeFieldMetrics: []*graphqlmetricsv1.TypeFieldUsageInfo{ { @@ -386,8 +387,8 @@ func TestExportBatchInterval(t *testing.T) { defer require.Nil(t, e.Shutdown(context.Background())) - require.Equal(t, 1, len(c.publishedAggregations)) - require.Equal(t, 5, len(c.publishedAggregations[0])) + require.Len(t, c.publishedAggregations, 1) + require.Len(t, c.publishedAggregations[0], 5) } func TestExportFullQueue(t *testing.T) { @@ -404,11 +405,11 @@ func TestExportFullQueue(t *testing.T) { zap.NewNop(), c, "secret", - &ExporterSettings{ + &exporter.ExporterSettings{ BatchSize: batchSize, QueueSize: queueSize, Interval: 500 * time.Millisecond, - RetryOptions: RetryOptions{ + RetryOptions: exporter.RetryOptions{ Enabled: false, MaxDuration: 300 * time.Millisecond, Interval: 100 * time.Millisecond, diff --git a/router/internal/graphqlmetrics/prometheus_exporter.go b/router/internal/graphqlmetrics/prometheus_exporter.go index 0b1ec07047..3c57c121c3 100644 --- a/router/internal/graphqlmetrics/prometheus_exporter.go +++ b/router/internal/graphqlmetrics/prometheus_exporter.go @@ -4,6 +4,7 @@ import ( "context" graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/internal/exporter" "github.com/wundergraph/cosmo/router/pkg/metric" "go.uber.org/zap" ) @@ -11,7 +12,7 @@ import ( // PrometheusMetricsExporter wraps the generic Exporter for Prometheus metrics. // It provides a cleaner API for exporting schema field usage to Prometheus. type PrometheusMetricsExporter struct { - exporter *Exporter[*graphqlmetrics.SchemaUsageInfo] + exporter *exporter.Exporter[*graphqlmetrics.SchemaUsageInfo] } // NewPrometheusMetricsExporter creates a new exporter specifically for Prometheus metrics. @@ -20,7 +21,7 @@ func NewPrometheusMetricsExporter( logger *zap.Logger, metricStore metric.Store, includeOpSha bool, - settings *ExporterSettings, + settings *exporter.ExporterSettings, ) (*PrometheusMetricsExporter, error) { sink := NewPrometheusSink(PrometheusSinkConfig{ MetricStore: metricStore, @@ -34,7 +35,7 @@ func NewPrometheusMetricsExporter( return false // Don't retry Prometheus errors } - exporter, err := NewExporter(logger, sink, errorHandler, settings) + exporter, err := exporter.NewExporter(logger, sink, errorHandler, settings) if err != nil { return nil, err }