@@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler
44import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult
55import datadog.trace.core.CoreSpan
66import datadog.trace.core.DDSpan
7+ import datadog.trace.core.DDSpanContext
8+ import datadog.trace.core.PendingTrace
79import datadog.trace.core.monitor.HealthMetrics
10+ import datadog.trace.core.postprocessor.SpanPostProcessor
811import datadog.trace.test.util.DDSpecification
912import spock.util.concurrent.PollingConditions
1013
@@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
4144 FAST_LANE ,
4245 1 ,
4346 TimeUnit . NANOSECONDS ,
47+ null ,
4448 null
4549 ) // stop heartbeats from being throttled
4650
@@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
6771 FAST_LANE ,
6872 1 ,
6973 TimeUnit . NANOSECONDS ,
74+ null ,
7075 null
7176 ) // stop heartbeats from being throttled
7277 def timeConditions = new PollingConditions (timeout : 1 , initialDelay : 1 , factor : 1.25 )
@@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
9297 false
9398 },
9499 FAST_LANE ,
95- 100 , TimeUnit . SECONDS , null ) // prevent heartbeats from helping the flush happen
100+ 100 , TimeUnit . SECONDS , null , null ) // prevent heartbeats from helping the flush happen
96101
97102 when : " there is pending work it is completed before a flush"
98103 // processing this span will throw an exception, but it should be caught
@@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
131136 throwingDispatcher, {
132137 false
133138 }, FAST_LANE ,
134- 100 , TimeUnit . SECONDS , null ) // prevent heartbeats from helping the flush happen
139+ 100 , TimeUnit . SECONDS , null , null ) // prevent heartbeats from helping the flush happen
135140 worker. start()
136141
137142 when : " a trace is processed but can't be passed on"
@@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification {
149154 priority << [SAMPLER_DROP , USER_DROP , SAMPLER_KEEP , USER_KEEP , UNSET ]
150155 }
151156
157+ def " trace should be post-processed" () {
158+ setup :
159+ AtomicInteger acceptedCount = new AtomicInteger ()
160+ PayloadDispatcherImpl countingDispatcher = Mock (PayloadDispatcherImpl )
161+ countingDispatcher. addTrace(_) >> {
162+ acceptedCount. getAndIncrement()
163+ }
164+ HealthMetrics healthMetrics = Mock (HealthMetrics )
165+
166+ // Span 1 - should be post-processed
167+ def span1 = DDSpan . create(" test" , 0 , Mock (DDSpanContext ) {
168+ isRequiresPostProcessing() >> true
169+ getTrace() >> Mock (PendingTrace ) {
170+ getCurrentTimeNano() >> 0
171+ }
172+ }, [])
173+ def processedSpan1 = false
174+
175+ // Span 2 - should NOT be post-processed
176+ def span2 = DDSpan . create(" test" , 0 , Mock (DDSpanContext ) {
177+ isRequiresPostProcessing() >> false
178+ getTrace() >> Mock (PendingTrace ) {
179+ getCurrentTimeNano() >> 0
180+ }
181+ }, [])
182+ def processedSpan2 = false
183+
184+ SpanPostProcessor spanPostProcessor = Mock (SpanPostProcessor ) {
185+ process(span1, _) >> { processedSpan1 = true }
186+ process(span2, _) >> { processedSpan2 = true }
187+ }
188+
189+ TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics,
190+ countingDispatcher, {
191+ false
192+ }, FAST_LANE , 100 , TimeUnit . SECONDS , null , spanPostProcessor)
193+ worker. start()
194+
195+ when : " traces are submitted"
196+ worker. publish(span1, SAMPLER_KEEP , [span1, span2])
197+ worker. publish(span2, SAMPLER_KEEP , [span1, span2])
198+
199+ then : " traces are passed through unless rejected on submission"
200+ conditions. eventually {
201+ assert processedSpan1 == true
202+ assert processedSpan2 == false
203+ }
204+
205+ cleanup :
206+ worker. close()
207+ }
208+
152209 def " traces should be processed" () {
153210 setup :
154211 AtomicInteger acceptedCount = new AtomicInteger ()
@@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
160217 TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics,
161218 countingDispatcher, {
162219 false
163- }, FAST_LANE , 100 , TimeUnit . SECONDS , null )
220+ }, FAST_LANE , 100 , TimeUnit . SECONDS , null , null )
164221 // prevent heartbeats from helping the flush happen
165222 worker. start()
166223
@@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
211268 TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics,
212269 countingDispatcher, {
213270 false
214- }, FAST_LANE , 100 , TimeUnit . SECONDS , null )
271+ }, FAST_LANE , 100 , TimeUnit . SECONDS , null , null )
215272 worker. start()
216273 worker. close()
217274 int queueSize = 0
@@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
248305 return false
249306 }
250307 }
251- TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics, countingDispatcher, { true }, FAST_LANE , 100 , TimeUnit . SECONDS , singleSpanSampler)
308+ TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics, countingDispatcher, { true }, FAST_LANE , 100 , TimeUnit . SECONDS , singleSpanSampler, null )
252309 worker. start()
253310
254311 when : " traces are submitted"
@@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
324381 return false
325382 }
326383 }
327- TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics, countingDispatcher, { false }, FAST_LANE , 100 , TimeUnit . SECONDS , singleSpanSampler)
384+ TraceProcessingWorker worker = new TraceProcessingWorker (10 , healthMetrics, countingDispatcher, { false }, FAST_LANE , 100 , TimeUnit . SECONDS , singleSpanSampler, null )
328385 worker. start()
329386
330387 when : " traces are submitted"
0 commit comments