diff --git a/.github/workflows/system-tests.yml b/.github/workflows/system-tests.yml index f38b40fdf5..e220464c58 100644 --- a/.github/workflows/system-tests.yml +++ b/.github/workflows/system-tests.yml @@ -136,6 +136,8 @@ jobs: scenario: AGENT_SUPPORTING_SPAN_EVENTS - weblog-variant: net-http scenario: TELEMETRY_METRIC_GENERATION_DISABLED + - weblog-variant: net-http + scenario: APM_TRACING_EFFICIENT_PAYLOAD fail-fast: false env: TEST_LIBRARY: golang diff --git a/ddtrace/tracer/api.txt b/ddtrace/tracer/api.txt index d388901917..36893e6f5c 100644 --- a/ddtrace/tracer/api.txt +++ b/ddtrace/tracer/api.txt @@ -130,6 +130,16 @@ type UserMonitoringConfig struct { type UserMonitoringOption func(*UserMonitoringConfig)() +// File: payload_v1.go + +// Package Functions +func DecodeAttributes([]byte, *stringTable) (map[string]anyValue, []byte, error) +func DecodeKeyValueList([]byte, *stringTable) (map[string]anyValue, []byte, error) +func DecodeSpanEvents([]byte, *stringTable) ([]spanEvent, []byte, error) +func DecodeSpanLinks([]byte, *stringTable) ([]SpanLink, []byte, error) +func DecodeSpans([]byte, *stringTable) (spanList, []byte, error) +func DecodeTraceChunks([]byte, *stringTable) ([]traceChunk, []byte, error) + // File: propagator.go // Types diff --git a/ddtrace/tracer/log.go b/ddtrace/tracer/log.go index 04bdb0331d..9e4148b7f3 100644 --- a/ddtrace/tracer/log.go +++ b/ddtrace/tracer/log.go @@ -69,8 +69,12 @@ type startupInfo struct { // checkEndpoint tries to connect to the URL specified by endpoint. // If the endpoint is not reachable, checkEndpoint returns an error // explaining why. -func checkEndpoint(c *http.Client, endpoint string) error { - req, err := http.NewRequest("POST", endpoint, bytes.NewReader([]byte{0x90})) +func checkEndpoint(c *http.Client, endpoint string, protocol float64) error { + b := []byte{0x90} // empty array + if protocol == traceProtocolV1 { + b = []byte{0x80} // empty map + } + req, err := http.NewRequest("POST", endpoint, bytes.NewReader(b)) if err != nil { return fmt.Errorf("cannot create http request: %s", err) } @@ -162,8 +166,8 @@ func logStartup(t *tracer) { info.SampleRateLimit = fmt.Sprintf("%v", limit) } if !t.config.logToStdout { - if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint()); err != nil { - info.AgentError = err.Error() + if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint(), t.config.traceProtocol); err != nil { + info.AgentError = fmt.Sprintf("%s", err.Error()) log.Warn("DIAGNOSTICS Unable to reach agent intake: %s", err.Error()) } } diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 667ce1905b..d06e51c8e1 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -414,9 +414,6 @@ func newConfig(opts ...StartOption) (*config, error) { reportTelemetryOnAppStarted(telemetry.Configuration{Name: "trace_rate_limit", Value: c.traceRateLimitPerSecond, Origin: origin}) - // Set the trace protocol to use. - c.traceProtocol = internal.FloatEnv("DD_TRACE_AGENT_PROTOCOL_VERSION", traceProtocolV04) - if v := env.Get("OTEL_LOGS_EXPORTER"); v != "" { log.Warn("OTEL_LOGS_EXPORTER is not supported") } @@ -595,6 +592,15 @@ func newConfig(opts ...StartOption) (*config, error) { if c.transport == nil { c.transport = newHTTPTransport(c.agentURL.String(), c.httpClient) } + // Set the trace protocol to use. + if internal.BoolEnv("DD_TRACE_V1_PAYLOAD_FORMAT_ENABLED", false) { + c.traceProtocol = traceProtocolV1 + if t, ok := c.transport.(*httpTransport); ok { + t.traceURL = fmt.Sprintf("%s%s", c.agentURL.String(), tracesAPIPathV1) + } + } else { + c.traceProtocol = traceProtocolV04 + } if c.propagator == nil { envKey := "DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH" maxLen := internal.IntEnv(envKey, defaultMaxTagsHeaderLen) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 106edb24c8..8491189dc4 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -6,14 +6,8 @@ package tracer import ( - "bytes" - "encoding/binary" "io" "sync" - "sync/atomic" - - "github.com/DataDog/dd-trace-go/v2/internal/processtags" - "github.com/tinylib/msgp/msgp" ) // payloadStats contains the statistics of a payload. @@ -52,206 +46,35 @@ type payload interface { payloadReader } -// unsafePayload is a wrapper on top of the msgpack encoder which allows constructing an -// encoded array by pushing its entries sequentially, one at a time. It basically -// allows us to encode as we would with a stream, except that the contents of the stream -// can be read as a slice by the msgpack decoder at any time. It follows the guidelines -// from the msgpack array spec: -// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family -// -// unsafePayload implements io.Reader and can be used with the decoder directly. -// -// unsafePayload is not safe for concurrent use. -// -// unsafePayload is meant to be used only once and eventually dismissed with the -// single exception of retrying failed flush attempts. -// -// ⚠️ Warning! -// -// The payload should not be reused for multiple sets of traces. Resetting the -// payload for re-use requires the transport to wait for the HTTP package to -// Close the request body before attempting to re-use it again! This requires -// additional logic to be in place. See: -// -// • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138 -// • https://github.com/DataDog/dd-trace-go/pull/475 -// • https://github.com/DataDog/dd-trace-go/pull/549 -// • https://github.com/DataDog/dd-trace-go/pull/976 -type unsafePayload struct { - // header specifies the first few bytes in the msgpack stream - // indicating the type of array (fixarray, array16 or array32) - // and the number of items contained in the stream. - header []byte - - // off specifies the current read position on the header. - off int - - // count specifies the number of items in the stream. - count uint32 - - // buf holds the sequence of msgpack-encoded items. - buf bytes.Buffer - - // reader is used for reading the contents of buf. - reader *bytes.Reader - - // protocolVersion specifies the trace protocolVersion to use. - protocolVersion float64 -} - -var _ io.Reader = (*unsafePayload)(nil) - -// newUnsafePayload returns a ready to use unsafe payload. -func newUnsafePayload(protocol float64) *unsafePayload { - p := &unsafePayload{ - header: make([]byte, 8), - off: 8, - protocolVersion: protocol, - } - return p -} - -// push pushes a new item into the stream. -func (p *unsafePayload) push(t []*Span) (stats payloadStats, err error) { - p.setTracerTags(t) - sl := spanList(t) - p.buf.Grow(sl.Msgsize()) - if err := msgp.Encode(&p.buf, sl); err != nil { - return payloadStats{}, err - } - p.recordItem() - return p.stats(), nil -} - -func (p *unsafePayload) setTracerTags(t []*Span) { - // set on first chunk - if atomic.LoadUint32(&p.count) != 0 { - return - } - if len(t) == 0 { - return - } - pTags := processtags.GlobalTags().String() - if pTags == "" { - return +// newPayload returns a ready to use payload. +func newPayload(protocol float64) payload { + if protocol == traceProtocolV1 { + return &safePayload{ + p: newPayloadV1(), + } } - t[0].setProcessTags(pTags) -} - -// itemCount returns the number of items available in the stream. -func (p *unsafePayload) itemCount() int { - return int(atomic.LoadUint32(&p.count)) -} - -// size returns the payload size in bytes. After the first read the value becomes -// inaccurate by up to 8 bytes. -func (p *unsafePayload) size() int { - return p.buf.Len() + len(p.header) - p.off -} - -// reset sets up the payload to be read a second time. It maintains the -// underlying byte contents of the buffer. reset should not be used in order to -// reuse the payload for another set of traces. -func (p *unsafePayload) reset() { - p.updateHeader() - if p.reader != nil { - p.reader.Seek(0, 0) + return &safePayload{ + p: newPayloadV04(), } } -// clear empties the payload buffers. -func (p *unsafePayload) clear() { - p.buf = bytes.Buffer{} - p.reader = nil -} - // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family const ( + // arrays msgpackArrayFix byte = 144 // up to 15 items msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes -) - -// updateHeader updates the payload header based on the number of items currently -// present in the stream. -func (p *unsafePayload) updateHeader() { - n := uint64(atomic.LoadUint32(&p.count)) - switch { - case n <= 15: - p.header[7] = msgpackArrayFix + byte(n) - p.off = 7 - case n <= 1<<16-1: - binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes - p.header[5] = msgpackArray16 - p.off = 5 - default: // n <= 1<<32-1 - binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes - p.header[3] = msgpackArray32 - p.off = 3 - } -} - -// Close implements io.Closer -func (p *unsafePayload) Close() error { - return nil -} - -// Read implements io.Reader. It reads from the msgpack-encoded stream. -func (p *unsafePayload) Read(b []byte) (n int, err error) { - if p.off < len(p.header) { - // reading header - n = copy(b, p.header[p.off:]) - p.off += n - return n, nil - } - if p.reader == nil { - p.reader = bytes.NewReader(p.buf.Bytes()) - } - return p.reader.Read(b) -} - -// Write implements io.Writer. It writes data directly to the buffer. -func (p *unsafePayload) Write(data []byte) (n int, err error) { - return p.buf.Write(data) -} - -// grow grows the buffer to ensure it can accommodate n more bytes. -func (p *unsafePayload) grow(n int) { - p.buf.Grow(n) -} -// recordItem records that an item was added and updates the header. -func (p *unsafePayload) recordItem() { - atomic.AddUint32(&p.count, 1) - p.updateHeader() -} - -// stats returns the current stats of the payload. -func (p *unsafePayload) stats() payloadStats { - return payloadStats{ - size: p.size(), - itemCount: int(atomic.LoadUint32(&p.count)), - } -} - -// protocol returns the protocol version of the payload. -func (p *unsafePayload) protocol() float64 { - return p.protocolVersion -} - -var _ io.Reader = (*safePayload)(nil) - -// newPayload returns a ready to use thread-safe payload. -func newPayload(protocol float64) payload { - return &safePayload{ - p: newUnsafePayload(protocol), - } -} + // maps + msgpackMapFix byte = 0x80 // up to 15 items + msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes + msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes +) -// safePayload provides a thread-safe wrapper around unsafePayload. +// safePayload provides a thread-safe wrapper around payload. type safePayload struct { mu sync.RWMutex - p *unsafePayload + p payload } // push pushes a new item into the stream in a thread-safe manner. @@ -262,9 +85,9 @@ func (sp *safePayload) push(t spanList) (stats payloadStats, err error) { } // itemCount returns the number of items available in the stream in a thread-safe manner. +// This method is not thread-safe, but the underlying payload.itemCount() must be. func (sp *safePayload) itemCount() int { - // Use direct atomic access for better performance - no mutex needed - return int(atomic.LoadUint32(&sp.p.count)) + return sp.p.itemCount() } // size returns the payload size in bytes in a thread-safe manner. diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index cdae69372b..3dad5dd2fa 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -9,12 +9,16 @@ import ( "bytes" "fmt" "io" + "math" "strconv" "strings" "sync" "sync/atomic" "testing" + "github.com/DataDog/dd-trace-go/v2/internal/globalconfig" + "github.com/DataDog/dd-trace-go/v2/internal/processtags" + "github.com/DataDog/dd-trace-go/v2/internal/version" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tinylib/msgp/msgp" @@ -22,6 +26,7 @@ import ( var fixedTime = now() +// creates a simple span list with n spans func newSpanList(n int) spanList { itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"} list := make([]*Span, n) @@ -32,6 +37,23 @@ func newSpanList(n int) spanList { return list } +// creates a list of n spans, populated with SpanLinks, SpanEvents, and other fields +func newDetailedSpanList(n int) spanList { + itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"} + list := make([]*Span, n) + for i := 0; i < n; i++ { + list[i] = newBasicSpan("span.list." + itoa[i%5+1]) + list[i].start = fixedTime + list[i].service = "golden" + list[i].resource = "resource." + itoa[i%5+1] + list[i].error = int32(i % 2) + list[i].SetTag("tag."+itoa[i%5+1], "value."+itoa[i%5+1]) + list[i].spanLinks = []SpanLink{{TraceID: 1, SpanID: 1}, {TraceID: 2, SpanID: 2}} + list[i].spanEvents = []spanEvent{{Name: "span.event." + itoa[i%5+1]}} + } + return list +} + // TestPayloadIntegrity tests that whatever we push into the payload // allows us to read the same content as would have been encoded by // the codec. @@ -61,9 +83,9 @@ func TestPayloadIntegrity(t *testing.T) { } } -// TestPayloadDecode ensures that whatever we push into the payload can +// TestPayloadV04Decode ensures that whatever we push into a v0.4 payload can // be decoded by the codec. -func TestPayloadDecode(t *testing.T) { +func TestPayloadV04Decode(t *testing.T) { for _, n := range []int{10, 1 << 10} { t.Run(strconv.Itoa(n), func(t *testing.T) { assert := assert.New(t) @@ -79,6 +101,147 @@ func TestPayloadDecode(t *testing.T) { } } +// TestPayloadV1Decode ensures that whatever we push into a v1 payload can +// be decoded by the codec, and that it matches the original payload. +func TestPayloadV1Decode(t *testing.T) { + for _, n := range []int{10, 1 << 10} { + t.Run("simple"+strconv.Itoa(n), func(t *testing.T) { + var ( + assert = assert.New(t) + p = newPayloadV1() + ) + p.SetContainerID("containerID") + p.SetLanguageName("go") + p.SetLanguageVersion("1.25") + p.SetTracerVersion(version.Tag) + p.SetRuntimeID(globalconfig.RuntimeID()) + p.SetEnv("test") + p.SetHostname("hostname") + p.SetAppVersion("appVersion") + + for i := 0; i < n; i++ { + _, _ = p.push(newSpanList(i%5 + 1)) + } + + encoded, err := io.ReadAll(p) + assert.NoError(err) + + got := newPayloadV1() + buf := bytes.NewBuffer(encoded) + _, err = buf.WriteTo(got) + assert.NoError(err) + + o, err := got.decodeBuffer() + assert.NoError(err) + assert.Empty(o) + assert.Equal(p.fields, got.fields) + assert.Equal(p.containerID, got.containerID) + assert.Equal(p.languageName, got.languageName) + assert.Equal(p.languageVersion, got.languageVersion) + assert.Equal(p.tracerVersion, got.tracerVersion) + assert.Equal(p.runtimeID, got.runtimeID) + assert.Equal(p.env, got.env) + assert.Equal(p.hostname, got.hostname) + assert.Equal(p.appVersion, got.appVersion) + assert.Equal(p.fields, got.fields) + }) + + t.Run("detailed"+strconv.Itoa(n), func(t *testing.T) { + var ( + assert = assert.New(t) + p = newPayloadV1() + ) + + for i := 0; i < n; i++ { + _, _ = p.push(newDetailedSpanList(i%5 + 1)) + } + encoded, err := io.ReadAll(p) + assert.NoError(err) + + got := newPayloadV1() + buf := bytes.NewBuffer(encoded) + _, err = buf.WriteTo(got) + assert.NoError(err) + + o, err := got.decodeBuffer() + assert.NoError(err) + assert.Empty(o) + assert.NotEmpty(got.attributes) + assert.Equal(p.attributes, got.attributes) + assert.Equal(got.attributes[keyProcessTags].value, processtags.GlobalTags().String()) + assert.Greater(len(got.chunks), 0) + assert.Equal(p.chunks[0].traceID, got.chunks[0].traceID) + assert.Equal(p.chunks[0].spans[0].spanID, got.chunks[0].spans[0].spanID) + assert.Equal(got.chunks[0].attributes["service"].value, "golden") + }) + } +} + +// TestPayloadV1EmbeddedStreamingStringTable tests that string values on the payload +// can be encoded and decoded correctly after using the string table. +// Tests repeated string values. +func TestPayloadV1EmbeddedStreamingStringTable(t *testing.T) { + p := newPayloadV1() + p.SetHostname("production") + p.SetEnv("production") + p.SetLanguageName("go") + + assert := assert.New(t) + encoded, err := io.ReadAll(p) + assert.NoError(err) + + got := newPayloadV1() + buf := bytes.NewBuffer(encoded) + _, err = buf.WriteTo(got) + assert.NoError(err) + + o, err := got.decodeBuffer() + assert.NoError(err) + assert.Empty(o) + assert.Equal(p.languageName, got.languageName) + assert.Equal(p.hostname, got.hostname) + assert.Equal(p.env, got.env) +} + +// TestPayloadV1UpdateHeader tests that the header of the payload is updated and grown correctly. +func TestPayloadV1UpdateHeader(t *testing.T) { + testCases := []uint32{ // Number of items + 0, + 15, + math.MaxUint16, + math.MaxUint32, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("n=%d", tc), func(t *testing.T) { + var ( + p = payloadV1{ + fields: tc, + header: make([]byte, 8), + } + expected []byte + ) + expected = msgp.AppendMapHeader(expected, tc) + p.updateHeader() + if got := p.header[p.readOff:]; !bytes.Equal(expected, got) { + t.Fatalf("expected %+v, got %+v", expected, got) + } + }) + } +} + +// TestEmptyPayloadV1 tests that an empty payload can be encoded and decoded correctly. +// Notably, it should send an empty map. +func TestEmptyPayloadV1(t *testing.T) { + p := newPayloadV1() + assert := assert.New(t) + encoded, err := io.ReadAll(p) + assert.NoError(err) + length, o, err := msgp.ReadMapHeaderBytes(encoded) + assert.NoError(err) + assert.Equal(uint32(0), length) + assert.Empty(o) +} + func assertProcessTags(t *testing.T, payload spanLists) { assert := assert.New(t) for i, spanList := range payload { @@ -105,7 +268,7 @@ func BenchmarkPayloadThroughput(b *testing.B) { // payload is filled. func benchmarkPayloadThroughput(count int) func(*testing.B) { return func(b *testing.B) { - p := newUnsafePayload(traceProtocolV04) + p := newPayloadV04() s := newBasicSpan("X") s.meta["key"] = strings.Repeat("X", 10*1024) trace := make(spanList, count) @@ -265,7 +428,7 @@ func BenchmarkPayloadPush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - p := newUnsafePayload(traceProtocolV04) + p := newPayloadV04() _, _ = p.push(spans) } }) diff --git a/ddtrace/tracer/payload_v04.go b/ddtrace/tracer/payload_v04.go new file mode 100644 index 0000000000..90b59a8d7c --- /dev/null +++ b/ddtrace/tracer/payload_v04.go @@ -0,0 +1,192 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package tracer + +import ( + "bytes" + "encoding/binary" + "io" + "sync/atomic" + + "github.com/DataDog/dd-trace-go/v2/internal/processtags" + "github.com/tinylib/msgp/msgp" +) + +// payloadV04 is a wrapper on top of the msgpack encoder which allows constructing an +// encoded array by pushing its entries sequentially, one at a time. It basically +// allows us to encode as we would with a stream, except that the contents of the stream +// can be read as a slice by the msgpack decoder at any time. It follows the guidelines +// from the msgpack array spec: +// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family +// +// payloadV04 implements unsafePayload and can be used with the decoder directly. To create +// a new payload use the newPayloadV04 method. +// +// payloadV04 is not safe for concurrent use. +// +// payloadV04 is meant to be used only once and eventually dismissed with the +// single exception of retrying failed flush attempts. +// +// ⚠️ Warning! +// +// The payloadV04 should not be reused for multiple sets of traces. Resetting the +// payloadV04 for re-use requires the transport to wait for the HTTP package to +// Close the request body before attempting to re-use it again! This requires +// additional logic to be in place. See: +// +// • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138 +// • https://github.com/DataDog/dd-trace-go/pull/475 +// • https://github.com/DataDog/dd-trace-go/pull/549 +// • https://github.com/DataDog/dd-trace-go/pull/976 +type payloadV04 struct { + // header specifies the first few bytes in the msgpack stream + // indicating the type of array (fixarray, array16 or array32) + // and the number of items contained in the stream. + header []byte + + // off specifies the current read position on the header. + off int + + // count specifies the number of items in the stream. + count uint32 + + // buf holds the sequence of msgpack-encoded items. + buf bytes.Buffer + + // reader is used for reading the contents of buf. + reader *bytes.Reader +} + +var _ io.Reader = (*payloadV04)(nil) + +// newPayloadV04 returns a ready to use payload. +func newPayloadV04() *payloadV04 { + p := &payloadV04{ + header: make([]byte, 8), + off: 8, + } + return p +} + +// push pushes a new item into the stream. +func (p *payloadV04) push(t spanList) (stats payloadStats, err error) { + p.setTracerTags(t) + p.buf.Grow(t.Msgsize()) + if err := msgp.Encode(&p.buf, t); err != nil { + return payloadStats{}, err + } + p.recordItem() + return p.stats(), nil +} + +func (p *payloadV04) setTracerTags(t spanList) { + // set on first chunk + if atomic.LoadUint32(&p.count) != 0 { + return + } + if len(t) == 0 { + return + } + pTags := processtags.GlobalTags().String() + if pTags == "" { + return + } + t[0].setProcessTags(pTags) +} + +// itemCount returns the number of items available in the stream. +func (p *payloadV04) itemCount() int { + return int(atomic.LoadUint32(&p.count)) +} + +// size returns the payload size in bytes. After the first read the value becomes +// inaccurate by up to 8 bytes. +func (p *payloadV04) size() int { + return p.buf.Len() + len(p.header) - p.off +} + +// reset sets up the payload to be read a second time. It maintains the +// underlying byte contents of the buffer. reset should not be used in order to +// reuse the payload for another set of traces. +func (p *payloadV04) reset() { + p.updateHeader() + if p.reader != nil { + p.reader.Seek(0, 0) + } +} + +// clear empties the payload buffers. +func (p *payloadV04) clear() { + p.buf = bytes.Buffer{} + p.reader = nil +} + +// grow grows the buffer to ensure it can accommodate n more bytes. +func (p *payloadV04) grow(n int) { + p.buf.Grow(n) +} + +// recordItem records that an item was added and updates the header. +func (p *payloadV04) recordItem() { + atomic.AddUint32(&p.count, 1) + p.updateHeader() +} + +// stats returns the current stats of the payload. +func (p *payloadV04) stats() payloadStats { + return payloadStats{ + size: p.size(), + itemCount: int(atomic.LoadUint32(&p.count)), + } +} + +// protocol returns the protocol version of the payload. +func (p *payloadV04) protocol() float64 { + return traceProtocolV04 +} + +// updateHeader updates the payload header based on the number of items currently +// present in the stream. +func (p *payloadV04) updateHeader() { + n := uint64(atomic.LoadUint32(&p.count)) + switch { + case n <= 15: + p.header[7] = msgpackArrayFix + byte(n) + p.off = 7 + case n <= 1<<16-1: + binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes + p.header[5] = msgpackArray16 + p.off = 5 + default: // n <= 1<<32-1 + binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes + p.header[3] = msgpackArray32 + p.off = 3 + } +} + +// Close implements io.Closer +func (p *payloadV04) Close() error { + return nil +} + +// Write implements io.Writer. It writes data directly to the buffer. +func (p *payloadV04) Write(data []byte) (n int, err error) { + return p.buf.Write(data) +} + +// Read implements io.Reader. It reads from the msgpack-encoded stream. +func (p *payloadV04) Read(b []byte) (n int, err error) { + if p.off < len(p.header) { + // reading header + n = copy(b, p.header[p.off:]) + p.off += n + return n, nil + } + if p.reader == nil { + p.reader = bytes.NewReader(p.buf.Bytes()) + } + return p.reader.Read(b) +} diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go new file mode 100644 index 0000000000..adc4f89282 --- /dev/null +++ b/ddtrace/tracer/payload_v1.go @@ -0,0 +1,1424 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package tracer + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "strconv" + "sync/atomic" + + "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" + "github.com/DataDog/dd-trace-go/v2/internal/log" + "github.com/DataDog/dd-trace-go/v2/internal/processtags" + "github.com/tinylib/msgp/msgp" +) + +// payloadV1 is a new version of a msgp payload that can be sent to the agent. +// Be aware that payloadV1 follows the same rules and constraints as payloadV04. That is: +// +// payloadV1 is not safe for concurrent use +// +// payloadV1 is meant to be used only once and eventually dismissed with the +// single exception of retrying failed flush attempts. +// +// ⚠️ Warning! +// +// The payloadV1 should not be reused for multiple sets of traces. Resetting the +// payloadV1 for re-use requires the transport to wait for the HTTP package +// Close the request body before attempting to re-use it again! +type payloadV1 struct { + // bm keeps track of which fields have been set in the payload + // bits 1-11 are used for field IDs 1-11. Bit 0 is unused. + bm bitmap + + // the string ID of the container where the tracer is running + containerID string // 2 + + // the string language name of the tracer + languageName string // 3 + + // the string language version of the tracer + languageVersion string // 4 + + // the string version of the tracer + tracerVersion string // 5 + + // the V4 string UUID representation of a tracer session + runtimeID string // 6 + + // the optional `env` string tag that set with the tracer + env string // 7 + + // the optional string hostname of where the tracer is running + hostname string // 8 + + // the optional string `version` tag for the application set in the tracer + appVersion string // 9 + + // a collection of key to value pairs common in all `chunks` + attributes map[string]anyValue // 10 + + // a list of trace `chunks` + chunks []traceChunk // 11 + + // header specifies the first few bytes in the msgpack stream + // indicating the type of map (fixmap, map16 or map32) + // and the number of items contained in the stream. + header []byte + + // readOff specifies the current read position on the header. + readOff int + + // writeOff specifies the current read position on the header. + writeOff int + + // count specifies the number of items (traceChunks) in the stream. + count uint32 + + // fields specifies the number of fields in the payload. + fields uint32 + + // buf holds the sequence of msgpack-encoded items. + buf []byte + + // reader is used for reading the contents of buf. + reader *bytes.Reader +} + +// newPayloadV1 returns a ready to use payloadV1. +func newPayloadV1() *payloadV1 { + return &payloadV1{ + attributes: make(map[string]anyValue), + chunks: make([]traceChunk, 0), + readOff: 0, + writeOff: 0, + } +} + +// push pushes a new item (a traceChunk)into the payload. +func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { + // We need to hydrate the payload with everything we get from the spans. + // Conceptually, our `t spanList` corresponds to one `traceChunk`. + if !p.bm.contains(11) && len(t) > 0 { + p.bm.set(11) + atomic.AddUint32(&p.fields, 1) + } + + // For now, we blindly set the origin, priority, and attributes values for the chunk + // In the future, attributes should hold values that are shared across all chunks in the payload + origin, priority, sm, traceID := "", 0, uint32(0), [16]byte{} + attr := make(map[string]anyValue) + for _, span := range t { + if span == nil { + continue + } + // If we haven't seen the service yet, we set it blindly assuming that all the spans created by + // a service must share the same value. + if _, ok := attr["service"]; !ok { + attr["service"] = anyValue{valueType: StringValueType, value: span.Root().service} + } + binary.BigEndian.PutUint64(traceID[:8], span.Context().traceID.Upper()) + binary.BigEndian.PutUint64(traceID[8:], span.Context().traceID.Lower()) + + if prio, ok := span.Context().SamplingPriority(); ok { + origin = span.Context().origin // TODO(darccio): are we sure that origin will be shared across all the spans in the chunk? + priority = prio // TODO(darccio): the same goes for priority. + dm := span.context.trace.propagatingTag(keyDecisionMaker) + if v, err := strconv.ParseInt(dm, 10, 32); err == nil { + if v < 0 { + v = -v + } + sm = uint32(v) + } else { + log.Error("failed to convert decision maker to uint32: %s", err.Error()) + } + } + } + tc := traceChunk{ + spans: t, + priority: int32(priority), + origin: origin, + traceID: traceID[:], + samplingMechanism: uint32(sm), + attributes: attr, + } + + // Append process tags to the payload attributes + // if there are attributes available, set them in our bitmap and increment + // the number of fields. + p.setProcessTags() + if !p.bm.contains(10) && len(p.attributes) > 0 { + p.bm.set(10) + atomic.AddUint32(&p.fields, 1) + } + + p.chunks = append(p.chunks, tc) + p.recordItem() + return p.stats(), err +} + +// grows the buffer to fit n more bytes. Follows the internal Go standard +// for growing slices (https://github.com/golang/go/blob/master/src/runtime/slice.go#L289) +func (p *payloadV1) grow(n int) { + cap := cap(p.buf) + newLen := len(p.buf) + n + threshold := 256 + for { + cap += (cap + 3*threshold) >> 2 + if cap >= newLen { + break + } + } + newBuffer := make([]byte, cap) + copy(newBuffer, p.buf) + p.buf = newBuffer +} + +func (p *payloadV1) reset() { + p.updateHeader() + if p.reader != nil { + p.reader.Seek(0, 0) + } +} + +func (p *payloadV1) clear() { + p.bm = 0 + p.buf = p.buf[:0] + p.reader = nil + p.header = nil + p.readOff = 0 + atomic.StoreUint32(&p.fields, 0) + p.count = 0 +} + +// recordItem records that a new chunk was added to the payload. +func (p *payloadV1) recordItem() { + atomic.AddUint32(&p.count, 1) +} + +func (p *payloadV1) stats() payloadStats { + return payloadStats{ + size: p.size(), + itemCount: p.itemCount(), + } +} + +func (p *payloadV1) size() int { + return len(p.buf) + len(p.header) - p.readOff +} + +func (p *payloadV1) itemCount() int { + return int(atomic.LoadUint32(&p.count)) +} + +func (p *payloadV1) protocol() float64 { + return traceProtocolV1 +} + +func (p *payloadV1) updateHeader() { + if len(p.header) == 0 { + p.header = make([]byte, 8) + } + n := atomic.LoadUint32(&p.fields) + switch { + case n <= 15: + p.header[7] = msgpackMapFix + byte(n) + p.readOff = 7 + case n <= 1<<16-1: + binary.BigEndian.PutUint64(p.header, uint64(n)) // writes 2 bytes + p.header[5] = msgpackMap16 + p.readOff = 5 + default: // n <= 1<<32-1 + binary.BigEndian.PutUint64(p.header, uint64(n)) // writes 4 bytes + p.header[3] = msgpackMap32 + p.readOff = 3 + } +} + +// Set process tags onto the payload attributes +func (p *payloadV1) setProcessTags() { + if atomic.LoadUint32(&p.count) != 0 { + return + } + pTags := processtags.GlobalTags().String() + if pTags == "" { + return + } + p.attributes[keyProcessTags] = anyValue{ + valueType: StringValueType, + value: pTags, + } +} + +func (p *payloadV1) Close() error { + p.clear() + return nil +} + +func (p *payloadV1) Write(b []byte) (int, error) { + p.buf = append(p.buf, b...) + return len(b), nil +} + +// Read implements io.Reader. It reads from the msgpack-encoded stream. +func (p *payloadV1) Read(b []byte) (n int, err error) { + if len(p.header) == 0 { + p.header = make([]byte, 8) + p.updateHeader() + } + if p.readOff < len(p.header) { + // reading header + n = copy(b, p.header[p.readOff:]) + p.readOff += n + return n, nil + } + if len(p.buf) == 0 { + p.encode() + } + if p.reader == nil { + p.reader = bytes.NewReader(p.buf) + } + return p.reader.Read(b) +} + +// encode writes existing payload fields into the buffer in msgp format. +func (p *payloadV1) encode() { + st := newStringTable() + p.buf = encodeField(p.buf, p.bm, 2, p.containerID, st) + p.buf = encodeField(p.buf, p.bm, 3, p.languageName, st) + p.buf = encodeField(p.buf, p.bm, 4, p.languageVersion, st) + p.buf = encodeField(p.buf, p.bm, 5, p.tracerVersion, st) + p.buf = encodeField(p.buf, p.bm, 6, p.runtimeID, st) + p.buf = encodeField(p.buf, p.bm, 7, p.env, st) + p.buf = encodeField(p.buf, p.bm, 8, p.hostname, st) + p.buf = encodeField(p.buf, p.bm, 9, p.appVersion, st) + + p.encodeAttributes(p.bm, 10, p.attributes, st) + + p.encodeTraceChunks(p.bm, 11, p.chunks, st) +} + +type fieldValue interface { + bool | []byte | int32 | int64 | uint32 | uint64 | string +} + +// encodeField takes a field of any fieldValue and encodes it into the given buffer +// in msgp format. +func encodeField[F fieldValue](buf []byte, bm bitmap, fieldID uint32, a F, st *stringTable) []byte { + if !bm.contains(fieldID) { + return buf + } + buf = msgp.AppendUint32(buf, uint32(fieldID)) // msgp key + switch value := any(a).(type) { + case string: + // encode msgp value, either by pulling from string table or writing it directly + buf = st.Serialize(value, buf) + case bool: + buf = msgp.AppendBool(buf, value) + case float64: + buf = msgp.AppendFloat64(buf, value) + case int32, int64: + buf = msgp.AppendInt64(buf, handleIntValue(value)) + case uint32: + buf = msgp.AppendUint32(buf, value) + case uint64: + buf = msgp.AppendUint64(buf, value) + case []byte: + buf = msgp.AppendBytes(buf, value) + case arrayValue: + buf = msgp.AppendArrayHeader(buf, uint32(len(value))) + for _, v := range value { + buf = v.encode(buf, st) + } + } + return buf +} + +// encodeAttributes encodes an array associated with fieldID into the buffer in msgp format. +// Each attribute is encoded as three values: the key, value type, and value. +func (p *payloadV1) encodeAttributes(bm bitmap, fieldID int, kv map[string]anyValue, st *stringTable) (bool, error) { + if !bm.contains(uint32(fieldID)) { + return false, nil + } + + p.buf = msgp.AppendUint32(p.buf, uint32(fieldID)) // msgp key + p.buf = msgp.AppendArrayHeader(p.buf, uint32(len(kv)*3)) // number of item pairs in array + + for k, v := range kv { + // encode msgp key + p.buf = st.Serialize(k, p.buf) + + // encode value + p.buf = v.encode(p.buf, st) + } + return true, nil +} + +// encodeTraceChunks encodes a list of trace chunks associated with fieldID into p.buf in msgp format. +func (p *payloadV1) encodeTraceChunks(bm bitmap, fieldID int, tc []traceChunk, st *stringTable) (bool, error) { + if len(tc) == 0 || !bm.contains(uint32(fieldID)) { + return false, nil + } + + p.buf = msgp.AppendUint32(p.buf, uint32(fieldID)) // msgp key + p.buf = msgp.AppendArrayHeader(p.buf, uint32(len(tc))) // number of chunks + for _, chunk := range tc { + p.buf = msgp.AppendMapHeader(p.buf, 7) // number of fields in chunk + + // priority + p.buf = encodeField(p.buf, fullSetBitmap, 1, chunk.priority, st) + + // origin + p.buf = encodeField(p.buf, fullSetBitmap, 2, chunk.origin, st) + + // attributes + p.encodeAttributes(fullSetBitmap, 3, chunk.attributes, st) + + // spans + p.encodeSpans(fullSetBitmap, 4, chunk.spans, st) + + // droppedTrace + p.buf = encodeField(p.buf, fullSetBitmap, 5, chunk.droppedTrace, st) + + // traceID + p.buf = encodeField(p.buf, fullSetBitmap, 6, chunk.traceID, st) + + // samplingMechanism + p.buf = encodeField(p.buf, fullSetBitmap, 7, chunk.samplingMechanism, st) + } + + return true, nil +} + +// encodeSpans encodes a list of spans associated with fieldID into p.buf in msgp format. +func (p *payloadV1) encodeSpans(bm bitmap, fieldID int, spans spanList, st *stringTable) (bool, error) { + if len(spans) == 0 || !bm.contains(uint32(fieldID)) { + return false, nil + } + + p.buf = msgp.AppendUint32(p.buf, uint32(fieldID)) // msgp key + p.buf = msgp.AppendArrayHeader(p.buf, uint32(len(spans))) // number of spans + + for _, span := range spans { + if span == nil { + continue + } + p.buf = msgp.AppendMapHeader(p.buf, 16) // number of fields in span + + p.buf = encodeField(p.buf, fullSetBitmap, 1, span.service, st) + p.buf = encodeField(p.buf, fullSetBitmap, 2, span.name, st) + p.buf = encodeField(p.buf, fullSetBitmap, 3, span.resource, st) + p.buf = encodeField(p.buf, fullSetBitmap, 4, span.spanID, st) + p.buf = encodeField(p.buf, fullSetBitmap, 5, span.parentID, st) + p.buf = encodeField(p.buf, fullSetBitmap, 6, span.start, st) + p.buf = encodeField(p.buf, fullSetBitmap, 7, span.duration, st) + p.buf = encodeField(p.buf, fullSetBitmap, 8, span.error != 0, st) + + // span attributes combine the meta (tags), metrics and meta_struct + attr := map[string]anyValue{} + for k, v := range span.meta { + attr[k] = anyValue{ + valueType: StringValueType, + value: v, + } + } + for k, v := range span.metrics { + attr[k] = anyValue{ + valueType: FloatValueType, + value: v, + } + } + for k, v := range span.metaStruct { + av := buildAnyValue(v) + if av != nil { + attr[k] = *av + } + } + p.encodeAttributes(fullSetBitmap, 9, attr, st) + + p.buf = encodeField(p.buf, fullSetBitmap, 10, span.spanType, st) + p.encodeSpanLinks(fullSetBitmap, 11, span.spanLinks, st) + p.encodeSpanEvents(fullSetBitmap, 12, span.spanEvents, st) + + env := span.meta[ext.Environment] + p.buf = encodeField(p.buf, fullSetBitmap, 13, env, st) + + version := span.meta[ext.Version] + p.buf = encodeField(p.buf, fullSetBitmap, 14, version, st) + + component := span.meta[ext.Component] + p.buf = encodeField(p.buf, fullSetBitmap, 15, component, st) + + spanKind := span.meta[ext.SpanKind] + p.buf = encodeField(p.buf, fullSetBitmap, 16, getSpanKindValue(spanKind), st) + } + return true, nil +} + +// translate a span kind string to its uint32 value +func getSpanKindValue(sk string) uint32 { + switch sk { + case ext.SpanKindInternal: + return 1 + case ext.SpanKindServer: + return 2 + case ext.SpanKindClient: + return 3 + case ext.SpanKindProducer: + return 4 + case ext.SpanKindConsumer: + return 5 + default: + return 1 // default to internal + } +} + +// translate a span kind uint32 value to its string value +func getSpanKindString(sk uint32) string { + switch sk { + case 1: + return ext.SpanKindInternal + case 2: + return ext.SpanKindServer + case 3: + return ext.SpanKindClient + case 4: + return ext.SpanKindProducer + case 5: + return ext.SpanKindConsumer + default: + return ext.SpanKindInternal + } +} + +// encodeSpanLinks encodes a list of span links associated with fieldID into p.buf in msgp format. +func (p *payloadV1) encodeSpanLinks(bm bitmap, fieldID int, spanLinks []SpanLink, st *stringTable) (bool, error) { + if !bm.contains(uint32(fieldID)) { + return false, nil + } + p.buf = msgp.AppendUint32(p.buf, uint32(fieldID)) // msgp key + p.buf = msgp.AppendArrayHeader(p.buf, uint32(len(spanLinks))) // number of span links + + for _, link := range spanLinks { + p.buf = msgp.AppendMapHeader(p.buf, 5) // number of fields in span link + + p.buf = encodeField(p.buf, fullSetBitmap, 1, link.TraceID, st) + p.buf = encodeField(p.buf, fullSetBitmap, 2, link.SpanID, st) + + attr := map[string]anyValue{} + for k, v := range link.Attributes { + attr[k] = anyValue{ + valueType: StringValueType, + value: v, + } + } + p.encodeAttributes(fullSetBitmap, 3, attr, st) + + p.buf = encodeField(p.buf, fullSetBitmap, 4, link.Tracestate, st) + p.buf = encodeField(p.buf, fullSetBitmap, 5, link.Flags, st) + } + return true, nil +} + +// encodeSpanEvents encodes a list of span events associated with fieldID into p.buf in msgp format. +func (p *payloadV1) encodeSpanEvents(bm bitmap, fieldID int, spanEvents []spanEvent, st *stringTable) (bool, error) { + if !bm.contains(uint32(fieldID)) { + return false, nil + } + p.buf = msgp.AppendUint32(p.buf, uint32(fieldID)) // msgp key + p.buf = msgp.AppendArrayHeader(p.buf, uint32(len(spanEvents))) // number of span events + + for _, event := range spanEvents { + p.buf = msgp.AppendMapHeader(p.buf, 3) // number of fields in span event + + p.buf = encodeField(p.buf, fullSetBitmap, 1, event.TimeUnixNano, st) + p.buf = encodeField(p.buf, fullSetBitmap, 2, event.Name, st) + + attr := map[string]anyValue{} + for k, v := range event.Attributes { + switch v.Type { + case spanEventAttributeTypeString: + attr[k] = anyValue{ + valueType: StringValueType, + value: v.StringValue, + } + case spanEventAttributeTypeInt: + attr[k] = anyValue{ + valueType: IntValueType, + value: handleIntValue(v.IntValue), + } + case spanEventAttributeTypeDouble: + attr[k] = anyValue{ + valueType: FloatValueType, + value: v.DoubleValue, + } + case spanEventAttributeTypeBool: + attr[k] = anyValue{ + valueType: BoolValueType, + value: v.BoolValue, + } + case spanEventAttributeTypeArray: + attr[k] = anyValue{ + valueType: ArrayValueType, + value: v.ArrayValue, + } + default: + log.Warn("dropped unsupported span event attribute type %d", v.Type) + } + } + p.encodeAttributes(fullSetBitmap, 3, attr, st) + } + return true, nil +} + +// Getters for payloadV1 fields +func (p *payloadV1) GetContainerID() string { return p.containerID } +func (p *payloadV1) GetLanguageName() string { return p.languageName } +func (p *payloadV1) GetLanguageVersion() string { return p.languageVersion } +func (p *payloadV1) GetTracerVersion() string { return p.tracerVersion } +func (p *payloadV1) GetRuntimeID() string { return p.runtimeID } +func (p *payloadV1) GetEnv() string { return p.env } +func (p *payloadV1) GetHostname() string { return p.hostname } +func (p *payloadV1) GetAppVersion() string { return p.appVersion } +func (p *payloadV1) GetAttributes() map[string]anyValue { return p.attributes } + +func (p *payloadV1) SetContainerID(v string) { + p.containerID = v + p.bm.set(2) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetLanguageName(v string) { + p.languageName = v + p.bm.set(3) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetLanguageVersion(v string) { + p.languageVersion = v + p.bm.set(4) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetTracerVersion(v string) { + p.tracerVersion = v + p.bm.set(5) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetRuntimeID(v string) { + p.runtimeID = v + p.bm.set(6) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetEnv(v string) { + p.env = v + p.bm.set(7) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetHostname(v string) { + p.hostname = v + p.bm.set(8) + atomic.AddUint32(&p.fields, 1) +} + +func (p *payloadV1) SetAppVersion(v string) { + p.appVersion = v + p.bm.set(9) + atomic.AddUint32(&p.fields, 1) +} + +// decodeBuffer takes the buffer from the payload, decodes it, and populates the fields +// according to the msgpack-encoded byte stream. +func (p *payloadV1) decodeBuffer() ([]byte, error) { + numFields, o, err := msgp.ReadMapHeaderBytes(p.buf) + if err != nil { + return p.buf, err + } + p.buf = o + atomic.StoreUint32(&p.fields, numFields) + p.header = make([]byte, 8) + p.updateHeader() + + st := newStringTable() + fieldCount := 1 + for { + if len(o) == 0 || err != nil { + break + } + // read msgp field ID + var idx uint32 + idx, o, err = msgp.ReadUint32Bytes(o) + if err != nil { + break + } + + // handle attributes + if idx == 10 { + p.attributes, o, err = DecodeAttributes(o, st) + if err != nil { + break + } + continue + } + + // handle trace chunks + if idx == 11 { + p.chunks, o, err = DecodeTraceChunks(o, st) + if err != nil { + break + } + continue + } + + // read msgp string value + var value string + var ok bool + value, o, ok = st.Read(o) + if !ok { + err = errUnableDecodeString + break + } + + switch idx { + case 2: + p.containerID = value + case 3: + p.languageName = value + case 4: + p.languageVersion = value + case 5: + p.tracerVersion = value + case 6: + p.runtimeID = value + case 7: + p.env = value + case 8: + p.hostname = value + case 9: + p.appVersion = value + default: + err = fmt.Errorf("unexpected field ID %d", idx) + } + fieldCount++ + } + return o, err +} + +// AnyValue is a representation of the `any` value. It can take the following types: +// - uint32 +// - bool +// - float64 +// - int64 +// - uint8 +// intValue(5) - 0x405 (4 indicates this is an int AnyType, then 5 is encoded using positive fixed int format) +// stringValue(“a”) - 0x1a161 (1 indicates this is a string, then “a” is encoded using fixstr 0xa161) +// stringValue(2) - 0x102 (1 indicates this is a string, then a positive fixed int of 2 refers the 2nd index of the string table) +type anyValue struct { + valueType int + value interface{} +} + +const ( + StringValueType = iota + 1 // string or uint -- 1 + BoolValueType // boolean -- 2 + FloatValueType // float64 -- 3 + IntValueType // int64 -- 4 + BytesValueType // []uint8 -- 5 + ArrayValueType // []AnyValue -- 6 + keyValueListType // []keyValue -- 7 +) + +// buildAnyValue builds an anyValue from a given any type. +func buildAnyValue(v any) *anyValue { + switch v := v.(type) { + case string: + return &anyValue{valueType: StringValueType, value: v} + case bool: + return &anyValue{valueType: BoolValueType, value: v} + case float64: + return &anyValue{valueType: FloatValueType, value: v} + case int32, int64: + return &anyValue{valueType: IntValueType, value: handleIntValue(v)} + case []byte: + return &anyValue{valueType: BytesValueType, value: v} + case arrayValue: + return &anyValue{valueType: ArrayValueType, value: v} + default: + return nil + } +} + +func (a anyValue) encode(buf []byte, st *stringTable) []byte { + buf = msgp.AppendInt32(buf, int32(a.valueType)) + switch a.valueType { + case StringValueType: + s := a.value.(string) + buf = st.Serialize(s, buf) + case BoolValueType: + buf = msgp.AppendBool(buf, a.value.(bool)) + case FloatValueType: + buf = msgp.AppendFloat64(buf, a.value.(float64)) + case IntValueType: + buf = msgp.AppendInt64(buf, a.value.(int64)) + case BytesValueType: + buf = msgp.AppendBytes(buf, a.value.([]byte)) + case ArrayValueType: + buf = msgp.AppendArrayHeader(buf, uint32(len(a.value.(arrayValue)))) + for _, v := range a.value.(arrayValue) { + buf = v.encode(buf, st) + } + } + return buf +} + +// translate any int value to int64 +func handleIntValue(a any) int64 { + switch v := a.(type) { + case int64: + return v + case int32: + return int64(v) + default: + // Fallback for other integer types + return v.(int64) + } +} + +type arrayValue []anyValue + +// keeps track of which fields have been set in the payload, with a +// 1 for represented fields and 0 for unset fields. +type bitmap int32 + +// fullSetBitmap is a bitmap that represents all fields that have been set in the payload. +var fullSetBitmap bitmap = -1 + +func (b *bitmap) set(bit uint32) { + if bit >= 32 { + return + } + *b |= 1 << bit +} + +func (b bitmap) contains(bit uint32) bool { + if bit >= 32 { + return false + } + return b&(1<