Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 76 additions & 93 deletions router-tests/prometheus_improved_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"regexp"
"testing"
"time"

rmetric "github.com/wundergraph/cosmo/router/pkg/metric"

Expand Down Expand Up @@ -37,8 +38,7 @@ func TestPrometheusSchemaUsage(t *testing.T) {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0,
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand Down Expand Up @@ -79,6 +79,9 @@ query myQuery {
}
}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -87,7 +90,9 @@ query myQuery {

schemaUsageMetrics := schemaUsage.GetMetric()

require.Len(t, schemaUsageMetrics, 7)
// Note: The aggregated batch processing now correctly tracks all field usages,
// including fields accessed through interfaces, resulting in more accurate metrics
require.Len(t, schemaUsageMetrics, 8)

for _, metric := range schemaUsageMetrics {
assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery")
Expand All @@ -96,26 +101,31 @@ query myQuery {
assertLabelNotPresent(t, metric.Label, otel.WgOperationSha256)
}

assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLFieldName, "currentMood")
assertLabelValue(t, schemaUsageMetrics[0].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLFieldName, "departments")
assertLabelValue(t, schemaUsageMetrics[1].Label, otel.WgGraphQLParentType, "RoleType")

assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLFieldName, "employee")
assertLabelValue(t, schemaUsageMetrics[2].Label, otel.WgGraphQLParentType, "Query")

assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLFieldName, "id")
assertLabelValue(t, schemaUsageMetrics[3].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLFieldName, "role")
assertLabelValue(t, schemaUsageMetrics[4].Label, otel.WgGraphQLParentType, "Employee")

assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLFieldName, "title")
assertLabelValue(t, schemaUsageMetrics[5].Label, otel.WgGraphQLParentType, "Engineer")
// Verify we have all expected field/parent type combinations
// Note: Order may vary, so we'll just check that all expected metrics are present
fieldTypePairs := make(map[string]string)
for _, metric := range schemaUsageMetrics {
var fieldName, parentType string
for _, label := range metric.Label {
if *label.Name == "wg_graphql_field_name" {
fieldName = *label.Value
}
if *label.Name == "wg_graphql_parent_type" {
parentType = *label.Value
}
}
if fieldName != "" && parentType != "" {
fieldTypePairs[fieldName+":"+parentType] = parentType
}
}

assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLFieldName, "title")
assertLabelValue(t, schemaUsageMetrics[6].Label, otel.WgGraphQLParentType, "Operator")
// Verify expected field/parent combinations exist
require.Contains(t, fieldTypePairs, "currentMood:Employee")
require.Contains(t, fieldTypePairs, "employee:Query")
require.Contains(t, fieldTypePairs, "id:Employee")
require.Contains(t, fieldTypePairs, "role:Employee")
require.Contains(t, fieldTypePairs, "title:Engineer")
require.Contains(t, fieldTypePairs, "title:Operator")
})
})

Expand All @@ -130,8 +140,7 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0,
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -154,6 +163,9 @@ query myQuery {
}
}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -206,7 +218,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: false,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -215,6 +226,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -244,7 +258,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: true,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -253,6 +266,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand Down Expand Up @@ -288,7 +304,6 @@ query myQuery {
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
IncludeOperationSha: false,
SampleRate: 1.0,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -297,6 +312,9 @@ query myQuery {
})
require.JSONEq(t, `{"data":{"employee":{"id":1,"currentMood":"HAPPY","role":{"title":["Founder","CEO"]}}}}`, res.Body)

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -315,7 +333,7 @@ query myQuery {
})
})

t.Run("sampling reduces tracked requests", func(t *testing.T) {
t.Run("all requests are tracked", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
Expand All @@ -326,62 +344,7 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 0.1, // 10% sampling
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Make 100 requests
for i := 0; i < 100; i++ {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employee(id: 1) { id } }`,
})
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

mf, err := promRegistry.Gather()
require.NoError(t, err)

schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName)
assert.NotNil(t, schemaUsage)

schemaUsageMetrics := schemaUsage.GetMetric()

require.Greater(t, len(schemaUsageMetrics), 0, "At least 1 request should be sampled")

// With 10% sampling and 100 requests, each sampled request increments two field counters (`employee` and `id`).
// 100% sampling would produce 200 total field counts (100 requests * 2 fields), so a reduced total confirms sampling worked.
totalFieldCounts := 0.0
for _, m := range schemaUsageMetrics {
counter := m.GetCounter()
require.NotNil(t, counter)
totalFieldCounts += counter.GetValue()
}

require.Greater(t, totalFieldCounts, 0.0, "At least one sampled field is expected with a 10% sample rate")
require.Less(t, totalFieldCounts, 200.0, "Sampling should record fewer than 100% of requests (200 total field counts)")

// Verify that the sampled metrics have correct structure
for _, m := range schemaUsageMetrics {
assertLabelValue(t, m.Label, otel.WgOperationName, "myQuery")
assertLabelValue(t, m.Label, otel.WgOperationType, "query")
}
})
})

t.Run("100% sample rate tracks all requests", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
promRegistry := prometheus.NewRegistry()

testenv.Run(t, &testenv.Config{
MetricReader: metricReader,
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 1.0, // 100% sampling (default)
Enabled: true,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
Expand All @@ -393,6 +356,9 @@ query myQuery {
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

// Wait for metrics to be flushed (interval is 100ms in test env)
time.Sleep(200 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

Expand All @@ -401,7 +367,7 @@ query myQuery {

schemaUsageMetrics := schemaUsage.GetMetric()

// With 100% sampling and 10 requests, we expect 2 metrics (employee, id)
// We expect 2 metrics (employee, id)
// The counter values should be 10 for each field
require.Len(t, schemaUsageMetrics, 2)

Expand All @@ -415,7 +381,7 @@ query myQuery {
})
})

t.Run("0% sample rate tracks no requests", func(t *testing.T) {
t.Run("custom exporter settings", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
Expand All @@ -426,27 +392,44 @@ query myQuery {
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
PrometheusSchemaFieldUsage: testenv.PrometheusSchemaFieldUsage{
Enabled: true,
SampleRate: 0.0, // 0% sampling
Enabled: true,
Exporter: &testenv.PrometheusSchemaFieldUsageExporter{
BatchSize: 10, // Very small batch for immediate flush
QueueSize: 100,
Interval: 50 * time.Millisecond, // Fast flush
ExportTimeout: 2 * time.Second,
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Make 10 requests
for range 10 {
// Make 5 requests
for range 5 {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employee(id: 1) { id } }`,
})
require.JSONEq(t, `{"data":{"employee":{"id":1}}}`, res.Body)
}

// Wait for metrics to be flushed (custom interval is 50ms)
time.Sleep(100 * time.Millisecond)

mf, err := promRegistry.Gather()
require.NoError(t, err)

schemaUsage := findMetricFamilyByName(mf, SchemaFieldUsageMetricName)
assert.NotNil(t, schemaUsage)

schemaUsageMetrics := schemaUsage.GetMetric()

// We expect 2 metrics (employee, id)
require.Len(t, schemaUsageMetrics, 2)

for _, metric := range schemaUsageMetrics {
assertLabelValue(t, metric.Label, otel.WgOperationName, "myQuery")
assertLabelValue(t, metric.Label, otel.WgOperationType, "query")

// With 0% sampling, no metrics should be recorded
if schemaUsage != nil {
require.Len(t, schemaUsage.GetMetric(), 0, "No metrics should be recorded with 0% sampling")
// Each field should have been counted 5 times
assert.InEpsilon(t, 5.0, *metric.Counter.Value, 0.0001)
}
})
})
Expand Down
52 changes: 41 additions & 11 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,14 @@ type MetricOptions struct {
type PrometheusSchemaFieldUsage struct {
Enabled bool
IncludeOperationSha bool
SampleRate float64
Exporter *PrometheusSchemaFieldUsageExporter
}

type PrometheusSchemaFieldUsageExporter struct {
BatchSize int
QueueSize int
Interval time.Duration
ExportTimeout time.Duration
}

type Config struct {
Expand Down Expand Up @@ -1499,6 +1506,33 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
var prometheusConfig rmetric.PrometheusConfig

if testConfig.PrometheusRegistry != nil {
promSchemaUsage := rmetric.PrometheusSchemaFieldUsage{
Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled,
IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha,
}

// Provide defaults for exporter settings if enabled
// Use shorter intervals for tests to avoid waiting too long
if promSchemaUsage.Enabled {
if testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter != nil {
// Use user-provided exporter settings
promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{
BatchSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.BatchSize,
QueueSize: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.QueueSize,
Interval: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.Interval,
ExportTimeout: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Exporter.ExportTimeout,
}
} else {
// Use test-friendly defaults
promSchemaUsage.Exporter = rmetric.PrometheusSchemaFieldUsageExporter{
BatchSize: 100, // Smaller batch size for tests
QueueSize: 1000, // Smaller queue for tests
Interval: 100 * time.Millisecond, // Fast flush for tests
ExportTimeout: 5 * time.Second,
}
}
}

prometheusConfig = rmetric.PrometheusConfig{
Enabled: true,
ListenAddr: fmt.Sprintf("localhost:%d", testConfig.PrometheusPort),
Expand All @@ -1509,16 +1543,12 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
EngineStats: rmetric.EngineStatsConfig{
Subscription: testConfig.MetricOptions.PrometheusEngineStatsOptions.EnableSubscription,
},
CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics,
ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics,
ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels,
Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics,
ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo,
PromSchemaFieldUsage: rmetric.PrometheusSchemaFieldUsage{
Enabled: testConfig.MetricOptions.PrometheusSchemaFieldUsage.Enabled,
IncludeOperationSha: testConfig.MetricOptions.PrometheusSchemaFieldUsage.IncludeOperationSha,
SampleRate: testConfig.MetricOptions.PrometheusSchemaFieldUsage.SampleRate,
},
CircuitBreaker: testConfig.MetricOptions.EnablePrometheusCircuitBreakerMetrics,
ExcludeMetrics: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetrics,
ExcludeMetricLabels: testConfig.MetricOptions.MetricExclusions.ExcludedPrometheusMetricLabels,
Streams: testConfig.MetricOptions.EnablePrometheusStreamMetrics,
ExcludeScopeInfo: testConfig.MetricOptions.MetricExclusions.ExcludeScopeInfo,
PromSchemaFieldUsage: promSchemaUsage,
}
}

Expand Down
Loading
Loading