Skip to content

Commit bc3648b

Browse files
committed
Optimize such that you only parse the first messages headers once in the case where DSM is instrumented
1 parent 49da2bb commit bc3648b

File tree

6 files changed

+159
-99
lines changed

6 files changed

+159
-99
lines changed

src/trace/context/extractor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ export class TraceContextExtractor {
9191
if (EventValidator.isEventBridgeSQSEvent(event)) return new EventBridgeSQSEventTraceExtractor(this.tracerWrapper);
9292
if (EventValidator.isAppSyncResolverEvent(event)) return new AppSyncEventTraceExtractor(this.tracerWrapper);
9393
if (EventValidator.isSQSEvent(event)) return new SQSEventTraceExtractor(this.tracerWrapper, this.config);
94-
if (EventValidator.isKinesisStreamEvent(event)) return new KinesisEventTraceExtractor(this.tracerWrapper, this.config);
94+
if (EventValidator.isKinesisStreamEvent(event))
95+
return new KinesisEventTraceExtractor(this.tracerWrapper, this.config);
9596
if (EventValidator.isEventBridgeEvent(event)) return new EventBridgeEventTraceExtractor(this.tracerWrapper);
9697

9798
return;

src/trace/context/extractors/kinesis.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ describe("KinesisEventTraceExtractor", () => {
114114
it.each([
115115
["Records", {}, 0],
116116
["Records first entry", { Records: [] }, 0],
117-
["valid data in kinesis", { Records: [{ kinesis: { data: "{" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 0], // JSON.parse should fail
117+
["valid data in kinesis", { Records: [{ kinesis: { data: "{" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1], // JSON.parse should fail
118118
["_datadog in data", { Records: [{ kinesis: { data: "e30=" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1],
119119
])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => {
120120
const tracerWrapper = new TracerWrapper();
Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { KinesisStreamEvent } from "aws-lambda";
1+
import { KinesisStreamEvent, KinesisStreamRecord } from "aws-lambda";
22
import { logDebug } from "../../../utils";
33
import { EventTraceExtractor } from "../extractor";
44
import { TracerWrapper } from "../../tracer-wrapper";
@@ -9,14 +9,19 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor {
99
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1010

1111
extract(event: KinesisStreamEvent): SpanContextWrapper | null {
12-
// Set DSM consume checkpoints if enabled
12+
// Set DSM consume checkpoints if enabled and capture first record's headers
13+
let firstRecordHeaders: Record<string, string> | null = null;
1314
if (this.config.dataStreamsEnabled) {
14-
for (const record of event?.Records || []) {
15+
for (let i = 0; i < (event?.Records || []).length; i++) {
16+
const record = event.Records[i];
1517
try {
16-
const kinesisDataForRecord = record?.kinesis?.data;
17-
const decodedData = Buffer.from(kinesisDataForRecord, "base64").toString("ascii");
18-
const parsedBody = JSON.parse(decodedData);
19-
const headers = parsedBody?._datadog ?? null;
18+
const headers = this.getParsedRecordHeaders(record);
19+
20+
// Store first record's headers for trace context extraction
21+
if (i === 0) {
22+
firstRecordHeaders = headers;
23+
}
24+
2025
this.tracerWrapper.setConsumeCheckpoint(headers, "kinesis", record.eventSourceARN);
2126
} catch (error) {
2227
if (error instanceof Error) {
@@ -30,14 +35,16 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor {
3035
if (kinesisData === undefined) return null;
3136

3237
try {
33-
const decodedData = Buffer.from(kinesisData, "base64").toString("ascii");
34-
const parsedBody = JSON.parse(decodedData);
35-
const headers = parsedBody?._datadog;
36-
if (headers) {
37-
const traceContext = this.tracerWrapper.extract(headers);
38+
// Use already parsed headers from DSM if available, otherwise parse now
39+
if (!firstRecordHeaders) {
40+
firstRecordHeaders = this.getParsedRecordHeaders(event?.Records?.[0]);
41+
}
42+
43+
if (firstRecordHeaders) {
44+
const traceContext = this.tracerWrapper.extract(firstRecordHeaders);
3845
if (traceContext === null) return null;
3946

40-
logDebug(`Extracted trace context from Kinesis event`, { traceContext, headers });
47+
logDebug(`Extracted trace context from Kinesis event`, { traceContext, headers: firstRecordHeaders });
4148
return traceContext;
4249
}
4350
} catch (error) {
@@ -48,4 +55,23 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor {
4855

4956
return null;
5057
}
58+
59+
private getParsedRecordHeaders(record: KinesisStreamRecord | undefined): Record<string, string> | null {
60+
if (!record) {
61+
return null;
62+
}
63+
64+
try {
65+
const kinesisDataForRecord = record?.kinesis?.data;
66+
if (!kinesisDataForRecord) {
67+
return null;
68+
}
69+
70+
const decodedData = Buffer.from(kinesisDataForRecord, "base64").toString("ascii");
71+
const parsedBody = JSON.parse(decodedData);
72+
return parsedBody?._datadog ?? null;
73+
} catch (error) {
74+
return null;
75+
}
76+
}
5177
}

src/trace/context/extractors/sns-sqs.ts

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor {
1717
const record = event.Records[i];
1818
try {
1919
const headers = this.getParsedRecordHeaders(record);
20-
20+
2121
// Store first record's headers for trace context extraction
2222
if (i === 0) {
2323
firstRecordHeaders = headers;
@@ -32,50 +32,63 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor {
3232
}
3333

3434
logDebug("SNS-SQS Extractor Being Used");
35+
try {
36+
// Use already parsed headers from DSM if available, otherwise parse now
37+
if (!firstRecordHeaders) {
38+
firstRecordHeaders = this.getParsedRecordHeaders(event?.Records?.[0]);
39+
}
40+
41+
if (firstRecordHeaders) {
42+
const traceContext = extractTraceContext(firstRecordHeaders, this.tracerWrapper);
43+
if (traceContext) {
44+
return traceContext;
45+
}
46+
logDebug("Failed to extract trace context from SNS-SQS event");
47+
}
48+
49+
// Else try to extract trace context from attributes.AWSTraceHeader
50+
// (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
51+
const awsTraceHeader = event?.Records?.[0]?.attributes?.AWSTraceHeader;
52+
if (awsTraceHeader !== undefined) {
53+
return extractFromAWSTraceHeader(awsTraceHeader, "SNS-SQS");
54+
}
55+
} catch (error) {
56+
handleExtractionError(error, "SQS");
57+
}
58+
59+
return null;
60+
}
61+
62+
private getParsedRecordHeaders(record: SQSRecord | undefined): Record<string, string> | null {
63+
if (!record) {
64+
return null;
65+
}
3566
try {
3667
// Try to extract trace context from SNS wrapped in SQS
37-
const body = event?.Records?.[0]?.body;
68+
const body = record.body;
3869
if (body) {
3970
const parsedBody = JSON.parse(body);
4071
const snsMessageAttribute = parsedBody?.MessageAttributes?._datadog;
4172
if (snsMessageAttribute?.Value) {
42-
let headers;
4373
if (snsMessageAttribute.Type === "String") {
44-
headers = JSON.parse(snsMessageAttribute.Value);
74+
return JSON.parse(snsMessageAttribute.Value);
4575
} else {
4676
// Try decoding base64 values
4777
const decodedValue = Buffer.from(snsMessageAttribute.Value, "base64").toString("ascii");
48-
headers = JSON.parse(decodedValue);
78+
return JSON.parse(decodedValue);
4979
}
50-
51-
const traceContext = extractTraceContext(headers, this.tracerWrapper);
52-
if (traceContext) {
53-
return traceContext;
54-
}
55-
logDebug("Failed to extract trace context from SNS-SQS event");
5680
}
5781
}
5882

5983
// Check SQS message attributes as a fallback
60-
const sqsMessageAttribute = event?.Records?.[0]?.messageAttributes?._datadog;
84+
const sqsMessageAttribute = record.messageAttributes?._datadog;
6185
if (sqsMessageAttribute?.stringValue) {
62-
const headers = JSON.parse(sqsMessageAttribute.stringValue);
63-
const traceContext = extractTraceContext(headers, this.tracerWrapper);
64-
if (traceContext) {
65-
return traceContext;
66-
}
86+
return JSON.parse(sqsMessageAttribute.stringValue);
6787
}
6888

69-
// Else try to extract trace context from attributes.AWSTraceHeader
70-
// (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
71-
const awsTraceHeader = event?.Records?.[0]?.attributes?.AWSTraceHeader;
72-
if (awsTraceHeader !== undefined) {
73-
return extractFromAWSTraceHeader(awsTraceHeader, "SNS-SQS");
74-
}
89+
return null;
7590
} catch (error) {
76-
handleExtractionError(error, "SQS");
91+
return null;
7792
}
78-
79-
return null;
8093
}
8194
}
Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { SNSEvent } from "aws-lambda";
1+
import { SNSEvent, SNSEventRecord } from "aws-lambda";
22
import { TracerWrapper } from "../../tracer-wrapper";
33
import { logDebug } from "../../../utils";
44
import { EventTraceExtractor } from "../extractor";
@@ -11,46 +11,35 @@ export class SNSEventTraceExtractor implements EventTraceExtractor {
1111
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1212

1313
extract(event: SNSEvent): SpanContextWrapper | null {
14-
// Set DSM consume checkpoints if enabled
14+
// Set DSM consume checkpoints if enabled and capture first record's headers
15+
let firstRecordHeaders: Record<string, string> | null = null;
1516
if (this.config.dataStreamsEnabled) {
16-
for (const record of event?.Records || []) {
17+
for (let i = 0; i < (event?.Records || []).length; i++) {
18+
const record = event.Records[i];
1719
try {
18-
let headers = null;
19-
const sourceARN = record.Sns?.TopicArn;
20-
// First try to extract trace context from message attributes
21-
const messageAttribute = record.Sns?.MessageAttributes?._datadog;
22-
if (messageAttribute?.Value) {
23-
if (messageAttribute.Type === "String") {
24-
headers = JSON.parse(messageAttribute.Value);
25-
} else {
26-
// Try decoding base64 values
27-
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
28-
headers = JSON.parse(decodedValue);
29-
}
20+
const headers = this.getParsedRecordHeaders(record);
21+
22+
// Store first record's headers for trace context extraction
23+
if (i === 0) {
24+
firstRecordHeaders = headers;
3025
}
3126

3227
// Set a checkpoint for the record, even if we don't have headers
33-
this.tracerWrapper.setConsumeCheckpoint(headers, "sns", sourceARN);
28+
this.tracerWrapper.setConsumeCheckpoint(headers, "sns", record.Sns?.TopicArn);
3429
} catch (error) {
3530
handleExtractionError(error, "SNS DSM checkpoint");
3631
}
3732
}
3833
}
3934

4035
try {
41-
// First try to extract trace context from message attributes
42-
const messageAttribute = event?.Records?.[0]?.Sns?.MessageAttributes?._datadog;
43-
if (messageAttribute?.Value) {
44-
let headers;
45-
if (messageAttribute.Type === "String") {
46-
headers = JSON.parse(messageAttribute.Value);
47-
} else {
48-
// Try decoding base64 values
49-
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
50-
headers = JSON.parse(decodedValue);
51-
}
36+
// Use already parsed headers from DSM if available, otherwise parse now
37+
if (!firstRecordHeaders) {
38+
firstRecordHeaders = this.getParsedRecordHeaders(event?.Records?.[0]);
39+
}
5240

53-
const traceContext = extractTraceContext(headers, this.tracerWrapper);
41+
if (firstRecordHeaders) {
42+
const traceContext = extractTraceContext(firstRecordHeaders, this.tracerWrapper);
5443
if (traceContext) {
5544
return traceContext;
5645
}
@@ -68,4 +57,27 @@ export class SNSEventTraceExtractor implements EventTraceExtractor {
6857

6958
return null;
7059
}
60+
61+
private getParsedRecordHeaders(record: SNSEventRecord | undefined): Record<string, string> | null {
62+
if (!record) {
63+
return null;
64+
}
65+
try {
66+
// First try to extract trace context from message attributes
67+
const messageAttribute = record.Sns?.MessageAttributes?._datadog;
68+
if (messageAttribute?.Value) {
69+
if (messageAttribute.Type === "String") {
70+
return JSON.parse(messageAttribute.Value);
71+
} else {
72+
// Try decoding base64 values
73+
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
74+
return JSON.parse(decodedValue);
75+
}
76+
}
77+
78+
return null;
79+
} catch (error) {
80+
return null;
81+
}
82+
}
7183
}
Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { SQSEvent } from "aws-lambda";
1+
import { SQSEvent, SQSRecord } from "aws-lambda";
22
import { TracerWrapper } from "../../tracer-wrapper";
33
import { logDebug } from "../../../utils";
44
import { EventTraceExtractor } from "../extractor";
@@ -10,23 +10,18 @@ export class SQSEventTraceExtractor implements EventTraceExtractor {
1010
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1111

1212
extract(event: SQSEvent): SpanContextWrapper | null {
13-
// Set DSM consume checkpoints if enabled
13+
// Set DSM consume checkpoints if enabled and capture first record's headers
14+
let firstRecordHeaders: Record<string, string> | null = null;
1415
if (this.config.dataStreamsEnabled) {
15-
for (const record of event?.Records || []) {
16+
for (let i = 0; i < (event?.Records || []).length; i++) {
17+
const record = event.Records[i];
1618
try {
17-
// First get the headers from the message attributes, which makes it easy to extract trace context
18-
let headers = record.messageAttributes?._datadog?.stringValue;
19-
if (!headers) {
20-
// Then try to get headers from binary value. This happens when SNS->SQS, but SNS has raw message delivery enabled.
21-
// In this case, SNS maps any messageAttributes to the SQS messageAttributes.
22-
// We can at least get trace context from SQS, but we won't be able to create the SNS inferred span.
23-
const encodedTraceContext = record.messageAttributes?._datadog?.binaryValue;
24-
if (encodedTraceContext) {
25-
headers = Buffer.from(encodedTraceContext, "base64").toString("ascii");
26-
}
27-
}
19+
const headers = this.getParsedRecordHeaders(record);
2820

29-
headers = headers ? JSON.parse(headers) : null;
21+
// Store first record's headers for trace context extraction
22+
if (i === 0) {
23+
firstRecordHeaders = headers;
24+
}
3025

3126
// Set a checkpoint for the record, even if we don't have headers
3227
this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", record.eventSourceARN);
@@ -38,23 +33,13 @@ export class SQSEventTraceExtractor implements EventTraceExtractor {
3833

3934
logDebug("SQS Extractor Being Used");
4035
try {
41-
// First try to extract trace context from message attributes
42-
let headers = event?.Records?.[0]?.messageAttributes?._datadog?.stringValue;
43-
44-
if (!headers) {
45-
// Then try to get from binary value. This happens when SNS->SQS, but SNS has raw message delivery enabled.
46-
// In this case, SNS maps any messageAttributes to the SQS messageAttributes.
47-
// We can at least get trace context from SQS, but we won't be able to create the SNS inferred span.
48-
const encodedTraceContext = event?.Records?.[0]?.messageAttributes?._datadog?.binaryValue;
49-
if (encodedTraceContext) {
50-
headers = Buffer.from(encodedTraceContext, "base64").toString("ascii");
51-
}
36+
// Use already parsed headers from DSM if available, otherwise parse now
37+
if (!firstRecordHeaders) {
38+
firstRecordHeaders = this.getParsedRecordHeaders(event?.Records?.[0]);
5239
}
5340

54-
if (headers) {
55-
const parsedHeaders = JSON.parse(headers);
56-
57-
const traceContext = extractTraceContext(parsedHeaders, this.tracerWrapper);
41+
if (firstRecordHeaders) {
42+
const traceContext = extractTraceContext(firstRecordHeaders, this.tracerWrapper);
5843
if (traceContext) {
5944
return traceContext;
6045
}
@@ -73,4 +58,27 @@ export class SQSEventTraceExtractor implements EventTraceExtractor {
7358

7459
return null;
7560
}
61+
62+
private getParsedRecordHeaders(record: SQSRecord | undefined): Record<string, string> | null {
63+
if (!record) {
64+
return null;
65+
}
66+
try {
67+
// First get the headers from the message attributes
68+
let headers = record.messageAttributes?._datadog?.stringValue;
69+
if (!headers) {
70+
// Then try to get from binary value. This happens when SNS->SQS, but SNS has raw message delivery enabled.
71+
// In this case, SNS maps any messageAttributes to the SQS messageAttributes.
72+
// We can at least get trace context from SQS, but we won't be able to create the SNS inferred span.
73+
const encodedTraceContext = record.messageAttributes?._datadog?.binaryValue;
74+
if (encodedTraceContext) {
75+
headers = Buffer.from(encodedTraceContext, "base64").toString("ascii");
76+
}
77+
}
78+
79+
return headers ? JSON.parse(headers) : null;
80+
} catch (error) {
81+
return null;
82+
}
83+
}
7684
}

0 commit comments

Comments
 (0)