@@ -10,65 +10,89 @@ export class SNSSQSEventTraceExtractor implements EventTraceExtractor {
1010 constructor ( private tracerWrapper : TracerWrapper , private config : TraceConfig ) { }
1111
1212 extract ( event : SQSEvent ) : SpanContextWrapper | null {
13- logDebug ( "SNS-SQS Extractor Being Used" ) ;
14-
15- let context : SpanContextWrapper | null = null ;
16- for ( const record of event ?. Records || [ ] ) {
17- try {
18- // If we already have a context and dsm is not enabled, we can break out of the loop early
19- if ( ! this . config . dataStreamsEnabled && context ) {
20- break ;
21- }
22-
23- let headers = null ;
24- // Try to extract trace context from SNS wrapped in SQS
25- const body = record . body ;
26- if ( body ) {
27- const parsedBody = JSON . parse ( body ) ;
28- const snsMessageAttribute = parsedBody ?. MessageAttributes ?. _datadog ;
29- if ( snsMessageAttribute ?. Value ) {
30- if ( snsMessageAttribute . Type === "String" ) {
31- headers = JSON . parse ( snsMessageAttribute . Value ) ;
32- } else {
33- // Try decoding base64 values
34- const decodedValue = Buffer . from ( snsMessageAttribute . Value , "base64" ) . toString ( "ascii" ) ;
35- headers = JSON . parse ( decodedValue ) ;
13+ // Set DSM consume checkpoints if enabled
14+ if ( this . config . dataStreamsEnabled ) {
15+ for ( const record of event ?. Records || [ ] ) {
16+ try {
17+ let headers = null ;
18+ // Try to extract trace context from SNS wrapped in SQS
19+ const body = record . body ;
20+ if ( body ) {
21+ const parsedBody = JSON . parse ( body ) ;
22+ const snsMessageAttribute = parsedBody ?. MessageAttributes ?. _datadog ;
23+ if ( snsMessageAttribute ?. Value ) {
24+ if ( snsMessageAttribute . Type === "String" ) {
25+ headers = JSON . parse ( snsMessageAttribute . Value ) ;
26+ } else {
27+ // Try decoding base64 values
28+ const decodedValue = Buffer . from ( snsMessageAttribute . Value , "base64" ) . toString ( "ascii" ) ;
29+ headers = JSON . parse ( decodedValue ) ;
30+ }
3631 }
3732 }
38- }
3933
40- // Check SQS message attributes as a fallback
41- if ( ! headers ) {
42- const sqsMessageAttribute = record . messageAttributes ?. _datadog ;
43- if ( sqsMessageAttribute ?. stringValue ) {
44- headers = JSON . parse ( sqsMessageAttribute . stringValue ) ;
34+ // Check SQS message attributes as a fallback
35+ if ( ! headers ) {
36+ const sqsMessageAttribute = record . messageAttributes ?. _datadog ;
37+ if ( sqsMessageAttribute ?. stringValue ) {
38+ headers = JSON . parse ( sqsMessageAttribute . stringValue ) ;
39+ }
4540 }
46- }
4741
48- // Set a checkpoint for the record, even if we don't have headers
49- this . tracerWrapper . setConsumeCheckpoint ( headers , "sqs" , record . eventSourceARN ) ;
42+ // Set a checkpoint for the record, even if we don't have headers
43+ this . tracerWrapper . setConsumeCheckpoint ( headers , "sqs" , record . eventSourceARN ) ;
44+ } catch ( error ) {
45+ handleExtractionError ( error , "SQS DSM checkpoint" ) ;
46+ }
47+ }
48+ }
5049
51- // if we already have a context, no need to extract again
52- if ( context ) continue ;
50+ // Extract trace context (original logic - unchanged from master)
51+ logDebug ( "SNS-SQS Extractor Being Used" ) ;
52+ try {
53+ // Try to extract trace context from SNS wrapped in SQS
54+ const body = event ?. Records ?. [ 0 ] ?. body ;
55+ if ( body ) {
56+ const parsedBody = JSON . parse ( body ) ;
57+ const snsMessageAttribute = parsedBody ?. MessageAttributes ?. _datadog ;
58+ if ( snsMessageAttribute ?. Value ) {
59+ let headers ;
60+ if ( snsMessageAttribute . Type === "String" ) {
61+ headers = JSON . parse ( snsMessageAttribute . Value ) ;
62+ } else {
63+ // Try decoding base64 values
64+ const decodedValue = Buffer . from ( snsMessageAttribute . Value , "base64" ) . toString ( "ascii" ) ;
65+ headers = JSON . parse ( decodedValue ) ;
66+ }
5367
54- // Try to extract trace context from headers
55- if ( headers ) {
56- context = extractTraceContext ( headers , this . tracerWrapper ) ;
57- } else {
68+ const traceContext = extractTraceContext ( headers , this . tracerWrapper ) ;
69+ if ( traceContext ) {
70+ return traceContext ;
71+ }
5872 logDebug ( "Failed to extract trace context from SNS-SQS event" ) ;
73+ }
74+ }
5975
60- // Else try to extract trace context from attributes.AWSTraceHeader
61- // (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
62- const awsTraceHeader = record . attributes ?. AWSTraceHeader ;
63- if ( awsTraceHeader !== undefined ) {
64- context = extractFromAWSTraceHeader ( awsTraceHeader , "SNS-SQS" ) ;
65- }
76+ // Check SQS message attributes as a fallback
77+ const sqsMessageAttribute = event ?. Records ?. [ 0 ] ?. messageAttributes ?. _datadog ;
78+ if ( sqsMessageAttribute ?. stringValue ) {
79+ const headers = JSON . parse ( sqsMessageAttribute . stringValue ) ;
80+ const traceContext = extractTraceContext ( headers , this . tracerWrapper ) ;
81+ if ( traceContext ) {
82+ return traceContext ;
6683 }
67- } catch ( error ) {
68- handleExtractionError ( error , "SQS" ) ;
6984 }
85+
86+ // Else try to extract trace context from attributes.AWSTraceHeader
87+ // (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case)
88+ const awsTraceHeader = event ?. Records ?. [ 0 ] ?. attributes ?. AWSTraceHeader ;
89+ if ( awsTraceHeader !== undefined ) {
90+ return extractFromAWSTraceHeader ( awsTraceHeader , "SNS-SQS" ) ;
91+ }
92+ } catch ( error ) {
93+ handleExtractionError ( error , "SQS" ) ;
7094 }
7195
72- return context ;
96+ return null ;
7397 }
7498}
0 commit comments