Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
f5d87c3
draft: create v04 and v1 with interface
hannahkm Sep 5, 2025
25440fd
move trace chunk to payload.go
hannahkm Sep 5, 2025
d0635e4
feat(tracer): implement msgp serialization for payloadV1 and spanListV1
darccio Sep 1, 2025
478ef22
added missing spanlistv1 type
hannahkm Sep 5, 2025
230633a
finish cherry picking dario's PR
hannahkm Sep 5, 2025
c0f2493
fix newPayload and spanlist type
hannahkm Sep 5, 2025
1e941ac
wip: encode anyvalue and keyvalue
hannahkm Sep 11, 2025
dc00fd3
move spanLinkV1 into payload_v1
hannahkm Sep 11, 2025
98d30ce
wip: encode spans
hannahkm Sep 12, 2025
0ad7dd1
wip: span event and span link
hannahkm Sep 15, 2025
326a244
wip: payload
hannahkm Sep 15, 2025
ea80c7a
wip: encode traceChunk
hannahkm Sep 16, 2025
af26f15
fix payload encodings
hannahkm Sep 17, 2025
ded707d
fix traceChunk field types
hannahkm Sep 17, 2025
5124410
broken string table implementation
hannahkm Sep 18, 2025
bb1baaf
clean up some things
hannahkm Sep 22, 2025
f071e05
streamingKey type for stringTable (oh god)
hannahkm Sep 23, 2025
03abab3
fix some types, also i don't think we need spanlistv1 anymore
hannahkm Sep 23, 2025
22e70ec
fix immediate compiler issues
hannahkm Sep 24, 2025
68ab0d8
few more fixes with payload representations
hannahkm Sep 24, 2025
23c72cf
wip: decoding functions
hannahkm Sep 29, 2025
a12502c
wip: decode trace chunk func
hannahkm Sep 29, 2025
08b1b97
couple fixes
hannahkm Sep 30, 2025
60165d3
draft test
hannahkm Sep 30, 2025
11a4ad9
wip with many questions
hannahkm Oct 7, 2025
6d10c7a
some fixes and more todos
hannahkm Oct 7, 2025
46a6cb9
fix(ddtrace/tracer): improving the ergonomics of encodeField and smal…
darccio Oct 8, 2025
170a42c
wip: finish encoding
hannahkm Oct 8, 2025
7d6c3cc
decode??
hannahkm Oct 8, 2025
5881a50
debug some issues
hannahkm Oct 8, 2025
dc26c98
fix(ddtrace/tracer): don't encode map header twice
darccio Oct 9, 2025
9030925
fix(ddtrace/tracer): use string table to decode strings
darccio Oct 9, 2025
269988e
fix(ddtrace/tracer): encode uint32 and uint64 field values
darccio Oct 9, 2025
8c7bef3
fix(ddtrace/tracer): remove redundant err checks and return the right…
darccio Oct 9, 2025
8f06c7b
fix(ddtrace/tracer): rename error variable to comply with linter
darccio Oct 9, 2025
80dbd54
fix(ddtrace/tracer): don't use receiver's bitmap when encoding attrib…
darccio Oct 9, 2025
b64c68f
early return on fail while encoding, also fix some types
hannahkm Oct 9, 2025
8908891
use string table during encoding and add more int coverage
hannahkm Oct 9, 2025
e739a34
never overwrite buffers
hannahkm Oct 9, 2025
3a6e7fa
fix(ddtrace/tracer): cache service as payload attribute; enhance tests
darccio Oct 10, 2025
f3f86ab
fix: make span link and event decode consistent with other functions
hannahkm Oct 10, 2025
91c5929
add span links and events to detailed test
hannahkm Oct 10, 2025
58d2f62
v1 protocol revamp and fixes
hannahkm Oct 10, 2025
82c805b
document functions and update api.txt
hannahkm Oct 10, 2025
1392569
Merge remote-tracking branch 'origin' into hannahkm/msgp-payload-inte…
darccio Oct 16, 2025
bbb62d2
chore: apply changes from #4010
darccio Oct 16, 2025
d226900
Merge branch 'hannahkm/msgp-payload-interface' into hannahkm/implemen…
darccio Oct 16, 2025
72fae1d
chore: go fmt
darccio Oct 16, 2025
7d4c9cf
feat(.github/workflows): add APM_TRACING_EFFICIENT_PAYLOAD scenario f…
darccio Oct 16, 2025
a989bde
replace DD_TRACE_AGENT_PROTOCOL_VERSION with DD_TRACE_V1_PAYLOAD_FORM…
hannahkm Oct 20, 2025
e2ef0d5
initialize header with 8 bytes
hannahkm Oct 20, 2025
0067018
fix: move init of header size to inside updateHeader
hannahkm Oct 20, 2025
5e1633d
fix: use proper traceURL for v1 protocol
hannahkm Oct 20, 2025
888c82c
fix: encode attributes as a list instead of map
hannahkm Oct 21, 2025
c4b126d
update api txt file
hannahkm Oct 21, 2025
d931d7e
fix: properly set traceID
hannahkm Oct 21, 2025
a8b15f2
fix: use big endian for trace id?
hannahkm Oct 21, 2025
2ab2051
fix: encode both upper and lower traceID bits
hannahkm Oct 22, 2025
092f607
fix: traceID was never getting set
hannahkm Oct 22, 2025
639f9b2
fix: empty payload should also be encoded as a map
hannahkm Oct 22, 2025
581c994
fix: non atomic updates to fields, doc string fixes, etc
hannahkm Oct 22, 2025
0487a5a
fix: initial protocol msg was sending an array
hannahkm Oct 22, 2025
8813fa5
fix: sm was not properly represented as a uint32
hannahkm Oct 22, 2025
ff24c1f
fix: change spankind to uint32 instead of a string
hannahkm Oct 22, 2025
50724d8
trigger tests
hannahkm Oct 23, 2025
883e3f2
feat: support process tags on spans
hannahkm Oct 23, 2025
bc024ce
chore: improve testing to specifically check for process tags on the …
hannahkm Oct 23, 2025
e759d0e
fix: set process tags on attributes instead of on chunks
hannahkm Oct 23, 2025
e4ae913
trigger tests
hannahkm Oct 24, 2025
da2fd94
test against my system test branch
hannahkm Oct 27, 2025
813031f
revert test
hannahkm Oct 27, 2025
6ad1afe
trigger tests
hannahkm Oct 27, 2025
7fc1f29
Merge branch 'main' into hannahkm/implement-v1-serialization
hannahkm Oct 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/system-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ jobs:
scenario: AGENT_SUPPORTING_SPAN_EVENTS
- weblog-variant: net-http
scenario: TELEMETRY_METRIC_GENERATION_DISABLED
- weblog-variant: net-http
scenario: APM_TRACING_EFFICIENT_PAYLOAD
fail-fast: false
env:
TEST_LIBRARY: golang
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/tracer/api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ type UserMonitoringConfig struct {

type UserMonitoringOption func(*UserMonitoringConfig)()

// File: payload_v1.go

// Package Functions
func DecodeAttributes([]byte, *stringTable) (map[string]anyValue, []byte, error)
func DecodeKeyValueList([]byte, *stringTable) (map[string]anyValue, []byte, error)
func DecodeSpanEvents([]byte, *stringTable) ([]spanEvent, []byte, error)
func DecodeSpanLinks([]byte, *stringTable) ([]SpanLink, []byte, error)
func DecodeSpans([]byte, *stringTable) (spanList, []byte, error)
func DecodeTraceChunks([]byte, *stringTable) ([]traceChunk, []byte, error)

// File: propagator.go

// Types
Expand Down
12 changes: 8 additions & 4 deletions ddtrace/tracer/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ type startupInfo struct {
// checkEndpoint tries to connect to the URL specified by endpoint.
// If the endpoint is not reachable, checkEndpoint returns an error
// explaining why.
func checkEndpoint(c *http.Client, endpoint string) error {
req, err := http.NewRequest("POST", endpoint, bytes.NewReader([]byte{0x90}))
func checkEndpoint(c *http.Client, endpoint string, protocol float64) error {
b := []byte{0x90} // empty array
if protocol == traceProtocolV1 {
b = []byte{0x80} // empty map
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Payload v1 is represented by a message pack map, whereas the empty payload in v0.4 was represented by an array. To prevent failures when we send empty data, we need to check for the payload version and send the correct data type.

}
req, err := http.NewRequest("POST", endpoint, bytes.NewReader(b))
if err != nil {
return fmt.Errorf("cannot create http request: %s", err)
}
Expand Down Expand Up @@ -162,8 +166,8 @@ func logStartup(t *tracer) {
info.SampleRateLimit = fmt.Sprintf("%v", limit)
}
if !t.config.logToStdout {
if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint()); err != nil {
info.AgentError = err.Error()
if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint(), t.config.traceProtocol); err != nil {
info.AgentError = fmt.Sprintf("%s", err.Error())
log.Warn("DIAGNOSTICS Unable to reach agent intake: %s", err.Error())
}
}
Expand Down
12 changes: 9 additions & 3 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,6 @@ func newConfig(opts ...StartOption) (*config, error) {

reportTelemetryOnAppStarted(telemetry.Configuration{Name: "trace_rate_limit", Value: c.traceRateLimitPerSecond, Origin: origin})

// Set the trace protocol to use.
c.traceProtocol = internal.FloatEnv("DD_TRACE_AGENT_PROTOCOL_VERSION", traceProtocolV04)

if v := env.Get("OTEL_LOGS_EXPORTER"); v != "" {
log.Warn("OTEL_LOGS_EXPORTER is not supported")
}
Expand Down Expand Up @@ -595,6 +592,15 @@ func newConfig(opts ...StartOption) (*config, error) {
if c.transport == nil {
c.transport = newHTTPTransport(c.agentURL.String(), c.httpClient)
}
// Set the trace protocol to use.
if internal.BoolEnv("DD_TRACE_V1_PAYLOAD_FORMAT_ENABLED", false) {
c.traceProtocol = traceProtocolV1
if t, ok := c.transport.(*httpTransport); ok {
t.traceURL = fmt.Sprintf("%s%s", c.agentURL.String(), tracesAPIPathV1)
}
} else {
c.traceProtocol = traceProtocolV04
}
if c.propagator == nil {
envKey := "DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH"
maxLen := internal.IntEnv(envKey, defaultMaxTagsHeaderLen)
Expand Down
212 changes: 17 additions & 195 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@
package tracer

import (
"bytes"
"encoding/binary"
"io"
"sync"
"sync/atomic"

"github.com/DataDog/dd-trace-go/v2/internal/processtags"
"github.com/tinylib/msgp/msgp"
)

// payloadStats contains the statistics of a payload.
Expand Down Expand Up @@ -52,206 +46,35 @@ type payload interface {
payloadReader
}

// unsafePayload is a wrapper on top of the msgpack encoder which allows constructing an
// encoded array by pushing its entries sequentially, one at a time. It basically
// allows us to encode as we would with a stream, except that the contents of the stream
// can be read as a slice by the msgpack decoder at any time. It follows the guidelines
// from the msgpack array spec:
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
//
// unsafePayload implements io.Reader and can be used with the decoder directly.
//
// unsafePayload is not safe for concurrent use.
//
// unsafePayload is meant to be used only once and eventually dismissed with the
// single exception of retrying failed flush attempts.
//
// ⚠️ Warning!
//
// The payload should not be reused for multiple sets of traces. Resetting the
// payload for re-use requires the transport to wait for the HTTP package to
// Close the request body before attempting to re-use it again! This requires
// additional logic to be in place. See:
//
// • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138
// • https://github.com/DataDog/dd-trace-go/pull/475
// • https://github.com/DataDog/dd-trace-go/pull/549
// • https://github.com/DataDog/dd-trace-go/pull/976
type unsafePayload struct {
// header specifies the first few bytes in the msgpack stream
// indicating the type of array (fixarray, array16 or array32)
// and the number of items contained in the stream.
header []byte

// off specifies the current read position on the header.
off int

// count specifies the number of items in the stream.
count uint32

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer

// reader is used for reading the contents of buf.
reader *bytes.Reader

// protocolVersion specifies the trace protocolVersion to use.
protocolVersion float64
}

var _ io.Reader = (*unsafePayload)(nil)

// newUnsafePayload returns a ready to use unsafe payload.
func newUnsafePayload(protocol float64) *unsafePayload {
p := &unsafePayload{
header: make([]byte, 8),
off: 8,
protocolVersion: protocol,
}
return p
}

// push pushes a new item into the stream.
func (p *unsafePayload) push(t []*Span) (stats payloadStats, err error) {
p.setTracerTags(t)
sl := spanList(t)
p.buf.Grow(sl.Msgsize())
if err := msgp.Encode(&p.buf, sl); err != nil {
return payloadStats{}, err
}
p.recordItem()
return p.stats(), nil
}

func (p *unsafePayload) setTracerTags(t []*Span) {
// set on first chunk
if atomic.LoadUint32(&p.count) != 0 {
return
}
if len(t) == 0 {
return
}
pTags := processtags.GlobalTags().String()
if pTags == "" {
return
// newPayload returns a ready to use payload.
func newPayload(protocol float64) payload {
if protocol == traceProtocolV1 {
return &safePayload{
p: newPayloadV1(),
}
}
t[0].setProcessTags(pTags)
}

// itemCount returns the number of items available in the stream.
func (p *unsafePayload) itemCount() int {
return int(atomic.LoadUint32(&p.count))
}

// size returns the payload size in bytes. After the first read the value becomes
// inaccurate by up to 8 bytes.
func (p *unsafePayload) size() int {
return p.buf.Len() + len(p.header) - p.off
}

// reset sets up the payload to be read a second time. It maintains the
// underlying byte contents of the buffer. reset should not be used in order to
// reuse the payload for another set of traces.
func (p *unsafePayload) reset() {
p.updateHeader()
if p.reader != nil {
p.reader.Seek(0, 0)
return &safePayload{
p: newPayloadV04(),
}
}

// clear empties the payload buffers.
func (p *unsafePayload) clear() {
p.buf = bytes.Buffer{}
p.reader = nil
}

// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
// arrays
msgpackArrayFix byte = 144 // up to 15 items
msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
)

// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *unsafePayload) updateHeader() {
n := uint64(atomic.LoadUint32(&p.count))
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
p.off = 7
case n <= 1<<16-1:
binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
p.header[5] = msgpackArray16
p.off = 5
default: // n <= 1<<32-1
binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
p.header[3] = msgpackArray32
p.off = 3
}
}

// Close implements io.Closer
func (p *unsafePayload) Close() error {
return nil
}

// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *unsafePayload) Read(b []byte) (n int, err error) {
if p.off < len(p.header) {
// reading header
n = copy(b, p.header[p.off:])
p.off += n
return n, nil
}
if p.reader == nil {
p.reader = bytes.NewReader(p.buf.Bytes())
}
return p.reader.Read(b)
}

// Write implements io.Writer. It writes data directly to the buffer.
func (p *unsafePayload) Write(data []byte) (n int, err error) {
return p.buf.Write(data)
}

// grow grows the buffer to ensure it can accommodate n more bytes.
func (p *unsafePayload) grow(n int) {
p.buf.Grow(n)
}

// recordItem records that an item was added and updates the header.
func (p *unsafePayload) recordItem() {
atomic.AddUint32(&p.count, 1)
p.updateHeader()
}

// stats returns the current stats of the payload.
func (p *unsafePayload) stats() payloadStats {
return payloadStats{
size: p.size(),
itemCount: int(atomic.LoadUint32(&p.count)),
}
}

// protocol returns the protocol version of the payload.
func (p *unsafePayload) protocol() float64 {
return p.protocolVersion
}

var _ io.Reader = (*safePayload)(nil)

// newPayload returns a ready to use thread-safe payload.
func newPayload(protocol float64) payload {
return &safePayload{
p: newUnsafePayload(protocol),
}
}
// maps
msgpackMapFix byte = 0x80 // up to 15 items
msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes
msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes
)

// safePayload provides a thread-safe wrapper around unsafePayload.
// safePayload provides a thread-safe wrapper around payload.
type safePayload struct {
mu sync.RWMutex
p *unsafePayload
p payload
}

// push pushes a new item into the stream in a thread-safe manner.
Expand All @@ -263,8 +86,7 @@ func (sp *safePayload) push(t spanList) (stats payloadStats, err error) {

// itemCount returns the number of items available in the stream in a thread-safe manner.
func (sp *safePayload) itemCount() int {
// Use direct atomic access for better performance - no mutex needed
return int(atomic.LoadUint32(&sp.p.count))
return sp.p.itemCount()
}

// size returns the payload size in bytes in a thread-safe manner.
Expand Down
Loading
Loading