Skip to content

Commit 841d22a

Browse files
committed
chore: improve
1 parent bb6d441 commit 841d22a

File tree

10 files changed

+48
-15
lines changed

10 files changed

+48
-15
lines changed

router/demo.config.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,24 @@
55

66
version: "1"
77

8+
graphql_metrics:
9+
enabled: false
10+
11+
traffic_shaping:
12+
all:
13+
retry:
14+
enabled: false
15+
16+
telemetry:
17+
metrics:
18+
otlp:
19+
enabled: false
20+
prometheus:
21+
schema_usage:
22+
enabled: true
23+
exporter:
24+
interval: 3s
25+
826
events:
927
providers:
1028
nats:
@@ -19,4 +37,4 @@ events:
1937
redis:
2038
- id: my-redis
2139
urls:
22-
- "redis://localhost:6379/2"
40+
- "redis://localhost:6379/2"

router/internal/graphqlmetrics/exporter.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,13 @@ func (e *Exporter[T]) exportBatch(batch []T) error {
228228
}
229229

230230
// prepareAndSendBatch starts a goroutine to export the batch with retry logic.
231+
// The goroutine takes ownership of the batch slice and will return it to the pool when done.
231232
func (e *Exporter[T]) prepareAndSendBatch(batch []T) {
232233
e.logger.Debug("Preparing to send batch", zap.Int("batch_size", len(batch)))
233234
e.inflightBatches.Inc()
234235
go func() {
235236
defer e.inflightBatches.Dec()
237+
defer e.putBatchBuffer(batch) // Return buffer to pool after export completes
236238
e.exportBatchWithRetry(batch)
237239
}()
238240
}
@@ -328,17 +330,15 @@ func (e *Exporter[T]) start() {
328330
e.logger.Debug("Tick: flushing buffer", zap.Int("buffer_size", len(buffer)))
329331
if len(buffer) > 0 {
330332
e.prepareAndSendBatch(buffer)
331-
// Return buffer to pool after sending
332-
e.putBatchBuffer(buffer)
333+
// Ownership transferred to goroutine, get a new buffer
333334
buffer = nil
334335
}
335336
case item := <-e.queue:
336337
buffer = append(buffer, item)
337338
if len(buffer) == e.settings.BatchSize {
338339
e.logger.Debug("Buffer full, sending batch", zap.Int("batch_size", len(buffer)))
339340
e.prepareAndSendBatch(buffer)
340-
// Return buffer to pool after sending
341-
e.putBatchBuffer(buffer)
341+
// Ownership transferred to goroutine, get a new buffer
342342
buffer = nil
343343
}
344344
case <-e.shutdownSignal:
@@ -360,15 +360,13 @@ func (e *Exporter[T]) drainQueue(buffer []T) {
360360
buffer = append(buffer, item)
361361
if len(buffer) == e.settings.BatchSize {
362362
e.prepareAndSendBatch(buffer)
363-
// Return buffer to pool and get a new one
364-
e.putBatchBuffer(buffer)
363+
// Ownership transferred to goroutine, get a new buffer
365364
buffer = e.getBatchBuffer()
366365
}
367366
default:
368367
if len(buffer) > 0 {
369368
e.prepareAndSendBatch(buffer)
370-
// Return buffer to pool before returning
371-
e.putBatchBuffer(buffer)
369+
// Ownership transferred to goroutine
372370
}
373371
e.logger.Debug("Queue drained", zap.Int("drained_items", drainedItems))
374372
return

router/internal/graphqlmetrics/exporter_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ func TestExportBatchesWithUniqueSchemaUsages(t *testing.T) {
175175
SchemaInfo: &graphqlmetricsv1.SchemaInfo{
176176
Version: "1",
177177
},
178+
RequestInfo: &graphqlmetricsv1.RequestInfo{
179+
Error: false,
180+
StatusCode: http.StatusOK,
181+
},
178182
Attributes: map[string]string{},
179183
}
180184

@@ -246,6 +250,10 @@ func TestForceFlushSync(t *testing.T) {
246250
SchemaInfo: &graphqlmetricsv1.SchemaInfo{
247251
Version: "1",
248252
},
253+
RequestInfo: &graphqlmetricsv1.RequestInfo{
254+
Error: false,
255+
StatusCode: http.StatusOK,
256+
},
249257
Attributes: map[string]string{},
250258
}
251259

@@ -291,6 +299,10 @@ func TestForceFlushSync(t *testing.T) {
291299
SchemaInfo: &graphqlmetricsv1.SchemaInfo{
292300
Version: "1",
293301
},
302+
RequestInfo: &graphqlmetricsv1.RequestInfo{
303+
Error: false,
304+
StatusCode: http.StatusOK,
305+
},
294306
Attributes: map[string]string{},
295307
}
296308

@@ -360,6 +372,10 @@ func TestExportBatchInterval(t *testing.T) {
360372
SchemaInfo: &graphqlmetricsv1.SchemaInfo{
361373
Version: "1",
362374
},
375+
RequestInfo: &graphqlmetricsv1.RequestInfo{
376+
Error: false,
377+
StatusCode: http.StatusOK,
378+
},
363379
Attributes: map[string]string{},
364380
}
365381

router/internal/graphqlmetrics/prometheus_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (s *PrometheusSink) Export(ctx context.Context, batch []*graphqlmetrics.Sch
8282
ctx,
8383
int64(count),
8484
[]attribute.KeyValue{},
85-
otelmetric.WithAttributeSet(attribute.NewSet(allAttrs...)),
85+
otelmetric.WithAttributes(allAttrs...),
8686
)
8787
}
8888

router/pkg/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ type PrometheusSchemaFieldUsage struct {
121121
type PrometheusSchemaFieldUsageExporter struct {
122122
BatchSize int `yaml:"batch_size" envDefault:"4096" env:"BATCH_SIZE"`
123123
QueueSize int `yaml:"queue_size" envDefault:"12800" env:"QUEUE_SIZE"`
124-
Interval time.Duration `yaml:"interval" envDefault:"1s" env:"INTERVAL"`
124+
Interval time.Duration `yaml:"interval" envDefault:"2s" env:"INTERVAL"`
125125
ExportTimeout time.Duration `yaml:"export_timeout" envDefault:"10s" env:"EXPORT_TIMEOUT"`
126126
}
127127

router/pkg/config/config.schema.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1264,7 +1264,7 @@
12641264
},
12651265
"interval": {
12661266
"type": "string",
1267-
"default": "1s",
1267+
"default": "2s",
12681268
"duration": {
12691269
"minimum": "100ms"
12701270
},

router/pkg/config/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ telemetry:
702702
exporter := c.Config.Telemetry.Metrics.Prometheus.SchemaFieldUsage.Exporter
703703
require.Equal(t, 4096, exporter.BatchSize)
704704
require.Equal(t, 12800, exporter.QueueSize)
705-
require.Equal(t, 10*time.Second, exporter.Interval)
705+
require.Equal(t, 2*time.Second, exporter.Interval)
706706
require.Equal(t, 10*time.Second, exporter.ExportTimeout)
707707
})
708708

router/pkg/config/testdata/config_defaults.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"Exporter": {
6767
"BatchSize": 4096,
6868
"QueueSize": 12800,
69-
"Interval": 1000000000,
69+
"Interval": 2000000000,
7070
"ExportTimeout": 10000000000
7171
}
7272
}

router/pkg/config/testdata/config_full.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
"Exporter": {
9797
"BatchSize": 4096,
9898
"QueueSize": 12800,
99-
"Interval": 1000000000,
99+
"Interval": 2000000000,
100100
"ExportTimeout": 10000000000
101101
}
102102
}

router/pkg/graphqlschemausage/schemausage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type TypeFieldMetrics []*TypeFieldUsageInfo
2727

2828
// IntoGraphQLMetrics converts the TypeFieldMetrics into a []*graphqlmetrics.TypeFieldUsageInfo
2929
func (t TypeFieldMetrics) IntoGraphQLMetrics() []*graphqlmetrics.TypeFieldUsageInfo {
30+
// Pre-allocate slice with exact capacity
3031
metrics := make([]*graphqlmetrics.TypeFieldUsageInfo, len(t))
3132
for i, info := range t {
3233
metrics[i] = info.IntoGraphQLMetrics()

0 commit comments

Comments
 (0)