Skip to content

Commit e00c8e1

Browse files
committed
Update the extraction functions to separate the setting of dsm checkpoints with the context stuff completely
1 parent 632126b commit e00c8e1

File tree

4 files changed

+201
-146
lines changed

4 files changed

+201
-146
lines changed

src/trace/context/extractors/kinesis.ts

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,43 @@ export class KinesisEventTraceExtractor implements EventTraceExtractor {
99
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1010

1111
extract(event: KinesisStreamEvent): SpanContextWrapper | null {
12-
let context: SpanContextWrapper | null = null;
13-
14-
for (const record of event?.Records || []) {
15-
try {
16-
// If we already have a context and dsm is not enabled, we can break out of the loop early
17-
if (!this.config.dataStreamsEnabled && context) {
18-
break;
12+
// Set DSM consume checkpoints if enabled
13+
if (this.config.dataStreamsEnabled) {
14+
for (const record of event?.Records || []) {
15+
try {
16+
const kinesisDataForRecord = record?.kinesis?.data;
17+
const decodedData = Buffer.from(kinesisDataForRecord, "base64").toString("ascii");
18+
const parsedBody = JSON.parse(decodedData);
19+
const headers = parsedBody?._datadog ?? null;
20+
this.tracerWrapper.setConsumeCheckpoint(headers, "kinesis", record.eventSourceARN);
21+
} catch (error) {
22+
if (error instanceof Error) {
23+
logDebug("Unable to set DSM checkpoint for Kinesis event", error);
24+
}
1925
}
26+
}
27+
}
2028

21-
const kinesisData = record?.kinesis?.data;
22-
const decodedData = Buffer.from(kinesisData, "base64").toString("ascii");
23-
const parsedBody = JSON.parse(decodedData);
24-
const headers = parsedBody?._datadog ?? null;
25-
this.tracerWrapper.setConsumeCheckpoint(headers, "kinesis", record.eventSourceARN);
29+
const kinesisData = event?.Records?.[0]?.kinesis.data;
30+
if (kinesisData === undefined) return null;
2631

27-
// if we already have a context, no need to extract again
28-
if (context) continue;
29-
if (headers) {
30-
context = this.tracerWrapper.extract(headers);
31-
if (context !== null) {
32-
logDebug(`Extracted trace context from Kinesis event`, { traceContext: context, headers });
33-
}
34-
}
35-
} catch (error) {
36-
if (error instanceof Error) {
37-
logDebug("Unable to extract trace context from Kinesis event", error);
38-
}
32+
try {
33+
const decodedData = Buffer.from(kinesisData, "base64").toString("ascii");
34+
const parsedBody = JSON.parse(decodedData);
35+
const headers = parsedBody?._datadog;
36+
if (headers) {
37+
const traceContext = this.tracerWrapper.extract(headers);
38+
if (traceContext === null) return null;
39+
40+
logDebug(`Extracted trace context from Kinesis event`, { traceContext, headers });
41+
return traceContext;
42+
}
43+
} catch (error) {
44+
if (error instanceof Error) {
45+
logDebug("Unable to extract trace context from Kinesis event", error);
3946
}
4047
}
41-
return context;
48+
49+
return null;
4250
}
4351
}

src/trace/context/extractors/sns-sqs.ts

Lines changed: 70 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,65 +10,88 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor {
1010
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1111

1212
extract(event: SQSEvent): SpanContextWrapper | null {
13-
logDebug("SNS-SQS Extractor Being Used");
14-
15-
let context: SpanContextWrapper | null = null;
16-
for (const record of event?.Records || []) {
17-
try {
18-
// If we already have a context and dsm is not enabled, we can break out of the loop early
19-
if (!this.config.dataStreamsEnabled && context) {
20-
break;
21-
}
22-
23-
let headers = null;
24-
// Try to extract trace context from SNS wrapped in SQS
25-
const body = record.body;
26-
if (body) {
27-
const parsedBody = JSON.parse(body);
28-
const snsMessageAttribute = parsedBody?.MessageAttributes?._datadog;
29-
if (snsMessageAttribute?.Value) {
30-
if (snsMessageAttribute.Type === "String") {
31-
headers = JSON.parse(snsMessageAttribute.Value);
32-
} else {
33-
// Try decoding base64 values
34-
const decodedValue = Buffer.from(snsMessageAttribute.Value, "base64").toString("ascii");
35-
headers = JSON.parse(decodedValue);
13+
// Set DSM consume checkpoints if enabled
14+
if (this.config.dataStreamsEnabled) {
15+
for (const record of event?.Records || []) {
16+
try {
17+
let headers = null;
18+
// Try to extract trace context from SNS wrapped in SQS
19+
const body = record.body;
20+
if (body) {
21+
const parsedBody = JSON.parse(body);
22+
const snsMessageAttribute = parsedBody?.MessageAttributes?._datadog;
23+
if (snsMessageAttribute?.Value) {
24+
if (snsMessageAttribute.Type === "String") {
25+
headers = JSON.parse(snsMessageAttribute.Value);
26+
} else {
27+
// Try decoding base64 values
28+
const decodedValue = Buffer.from(snsMessageAttribute.Value, "base64").toString("ascii");
29+
headers = JSON.parse(decodedValue);
30+
}
3631
}
3732
}
38-
}
3933

40-
// Check SQS message attributes as a fallback
41-
if (!headers) {
42-
const sqsMessageAttribute = record.messageAttributes?._datadog;
43-
if (sqsMessageAttribute?.stringValue) {
44-
headers = JSON.parse(sqsMessageAttribute.stringValue);
34+
// Check SQS message attributes as a fallback
35+
if (!headers) {
36+
const sqsMessageAttribute = record.messageAttributes?._datadog;
37+
if (sqsMessageAttribute?.stringValue) {
38+
headers = JSON.parse(sqsMessageAttribute.stringValue);
39+
}
4540
}
46-
}
4741

48-
// Set a checkpoint for the record, even if we don't have headers
49-
this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", record.eventSourceARN);
42+
// Set a checkpoint for the record, even if we don't have headers
43+
this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", record.eventSourceARN);
44+
} catch (error) {
45+
handleExtractionError(error, "SQS DSM checkpoint");
46+
}
47+
}
48+
}
5049

51-
// if we already have a context, no need to extract again
52-
if (context) continue;
50+
logDebug("SNS-SQS Extractor Being Used");
51+
try {
52+
// Try to extract trace context from SNS wrapped in SQS
53+
const body = event?.Records?.[0]?.body;
54+
if (body) {
55+
const parsedBody = JSON.parse(body);
56+
const snsMessageAttribute = parsedBody?.MessageAttributes?._datadog;
57+
if (snsMessageAttribute?.Value) {
58+
let headers;
59+
if (snsMessageAttribute.Type === "String") {
60+
headers = JSON.parse(snsMessageAttribute.Value);
61+
} else {
62+
// Try decoding base64 values
63+
const decodedValue = Buffer.from(snsMessageAttribute.Value, "base64").toString("ascii");
64+
headers = JSON.parse(decodedValue);
65+
}
5366

54-
// Try to extract trace context from headers
55-
if (headers) {
56-
context = extractTraceContext(headers, this.tracerWrapper);
57-
} else {
67+
const traceContext = extractTraceContext(headers, this.tracerWrapper);
68+
if (traceContext) {
69+
return traceContext;
70+
}
5871
logDebug("Failed to extract trace context from SNS-SQS event");
72+
}
73+
}
5974

60-
// Else try to extract trace context from attributes.AWSTraceHeader
61-
// (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
62-
const awsTraceHeader = record.attributes?.AWSTraceHeader;
63-
if (awsTraceHeader !== undefined) {
64-
context = extractFromAWSTraceHeader(awsTraceHeader, "SNS-SQS");
65-
}
75+
// Check SQS message attributes as a fallback
76+
const sqsMessageAttribute = event?.Records?.[0]?.messageAttributes?._datadog;
77+
if (sqsMessageAttribute?.stringValue) {
78+
const headers = JSON.parse(sqsMessageAttribute.stringValue);
79+
const traceContext = extractTraceContext(headers, this.tracerWrapper);
80+
if (traceContext) {
81+
return traceContext;
6682
}
67-
} catch (error) {
68-
handleExtractionError(error, "SQS");
6983
}
84+
85+
// Else try to extract trace context from attributes.AWSTraceHeader
86+
// (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
87+
const awsTraceHeader = event?.Records?.[0]?.attributes?.AWSTraceHeader;
88+
if (awsTraceHeader !== undefined) {
89+
return extractFromAWSTraceHeader(awsTraceHeader, "SNS-SQS");
90+
}
91+
} catch (error) {
92+
handleExtractionError(error, "SQS");
7093
}
7194

72-
return context;
95+
return null;
7396
}
7497
}

src/trace/context/extractors/sns.ts

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,51 +11,61 @@ export class SNSEventTraceExtractor implements EventTraceExtractor {
1111
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}
1212

1313
extract(event: SNSEvent): SpanContextWrapper | null {
14-
let context: SpanContextWrapper | null = null;
15-
16-
for (const record of event?.Records || []) {
17-
try {
18-
// If we already have a context and dsm is not enabled, we can break out of the loop early
19-
if (!this.config.dataStreamsEnabled && context) {
20-
break;
21-
}
22-
23-
let headers = null;
24-
const sourceARN = record.Sns?.TopicArn;
25-
// First try to extract trace context from message attributes
26-
const messageAttribute = record.Sns?.MessageAttributes?._datadog;
27-
if (messageAttribute?.Value) {
28-
if (messageAttribute.Type === "String") {
29-
headers = JSON.parse(messageAttribute.Value);
30-
} else {
31-
// Try decoding base64 values
32-
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
33-
headers = JSON.parse(decodedValue);
14+
// Set DSM consume checkpoints if enabled
15+
if (this.config.dataStreamsEnabled) {
16+
for (const record of event?.Records || []) {
17+
try {
18+
let headers = null;
19+
const sourceARN = record.Sns?.TopicArn;
20+
// First try to extract trace context from message attributes
21+
const messageAttribute = record.Sns?.MessageAttributes?._datadog;
22+
if (messageAttribute?.Value) {
23+
if (messageAttribute.Type === "String") {
24+
headers = JSON.parse(messageAttribute.Value);
25+
} else {
26+
// Try decoding base64 values
27+
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
28+
headers = JSON.parse(decodedValue);
29+
}
3430
}
35-
}
3631

37-
// Set a checkpoint for the record, even if we don't have headers
38-
this.tracerWrapper.setConsumeCheckpoint(headers, "sns", sourceARN);
32+
// Set a checkpoint for the record, even if we don't have headers
33+
this.tracerWrapper.setConsumeCheckpoint(headers, "sns", sourceARN);
34+
} catch (error) {
35+
handleExtractionError(error, "SNS DSM checkpoint");
36+
}
37+
}
38+
}
3939

40-
// if we already have a context, no need to extract again
41-
if (context) continue;
42-
// Try to extract trace context from headers
43-
if (headers) {
44-
context = extractTraceContext(headers, this.tracerWrapper);
40+
try {
41+
// First try to extract trace context from message attributes
42+
const messageAttribute = event?.Records?.[0]?.Sns?.MessageAttributes?._datadog;
43+
if (messageAttribute?.Value) {
44+
let headers;
45+
if (messageAttribute.Type === "String") {
46+
headers = JSON.parse(messageAttribute.Value);
4547
} else {
46-
logDebug("Failed to extract trace context from SNS event");
48+
// Try decoding base64 values
49+
const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii");
50+
headers = JSON.parse(decodedValue);
51+
}
4752

48-
// Then try to extract trace context from _X_AMZN_TRACE_ID header (Upstream Java apps can
49-
// pass down Datadog trace id (parent id wrong) in the env in SNS case)
50-
if (process.env[AMZN_TRACE_ID_ENV_VAR]) {
51-
context = extractFromAWSTraceHeader(process.env[AMZN_TRACE_ID_ENV_VAR], "SNS");
52-
}
53+
const traceContext = extractTraceContext(headers, this.tracerWrapper);
54+
if (traceContext) {
55+
return traceContext;
5356
}
54-
} catch (error) {
55-
handleExtractionError(error, "SNS");
57+
logDebug("Failed to extract trace context from SNS event");
58+
}
59+
60+
// Then try to extract trace context from _X_AMZN_TRACE_ID header (Upstream Java apps can
61+
// pass down Datadog trace id (parent id wrong) in the env in SNS case)
62+
if (process.env[AMZN_TRACE_ID_ENV_VAR]) {
63+
return extractFromAWSTraceHeader(process.env[AMZN_TRACE_ID_ENV_VAR], "SNS");
5664
}
65+
} catch (error) {
66+
handleExtractionError(error, "SNS");
5767
}
5868

59-
return context;
69+
return null;
6070
}
6171
}

0 commit comments

Comments
 (0)