Skip to content

Commit dfaef99

Browse files
mhlidddeejgregor
andauthored
🪞 9757 - Avoid pending queue wedge if tracer flare is generated multiple times (#9840)
* Avoid pending queue wedge if tracer flare is generated multiple times In DumpDrain, the collectTraces method replaces the 'data' field with an empty ArrayList, but at the same time, it does not also reset the 'index' field. If another dump is performed later, this leads the get method reaching the 'return null' statement, and as the comment states, this can (and does) break the queue. This change does a few things: - Resets the index in collectTraces when the data field is replaced (and marks the index field as volatile). This should prevent the above situation from happening. - In case the situation still happens, a stand-in CommandElement is returned to avoid returning null. A warning message is also logged. - The existing "testing tracer flare dump with multiple traces" test case is expanded to exercise problem. Here is an example stack trace when the hang happens: "dd-trace-monitor" #38 daemon prio=5 os_prio=31 tid=0x0000000110e6e000 nid=0x7617 runnable [0x0000000171032000] java.lang.Thread.State: RUNNABLE at org.jctools.queues.MpscBlockingConsumerArrayQueue.spinWaitForElement(MpscBlockingConsumerArrayQueue.java:634) at org.jctools.queues.MpscBlockingConsumerArrayQueue.parkUntilNext(MpscBlockingConsumerArrayQueue.java:566) at org.jctools.queues.MpscBlockingConsumerArrayQueue.take(MpscBlockingConsumerArrayQueue.java:482) at datadog.trace.core.PendingTraceBuffer$DelayingPendingTraceBuffer$Worker.run(PendingTraceBuffer.java:317) at java.lang.Thread.run(Thread.java:750) * Use SEND_TELEMETRY for DumpDrain's index out of bounds warning log --------- Co-authored-by: DJ Gregor <[email protected]>
1 parent 1cfa708 commit dfaef99

File tree

2 files changed

+56
-16
lines changed

2 files changed

+56
-16
lines changed

‎dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java‎

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.core;
22

3+
import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
34
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
45
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
56
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
@@ -58,6 +59,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
5859
private static final long SLEEP_TIME_MS = 100;
5960
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
6061
private static final CommandElement DUMP_ELEMENT = new CommandElement();
62+
private static final CommandElement STAND_IN_ELEMENT = new CommandElement();
6163

6264
private final MpscBlockingConsumerArrayQueue<Element> queue;
6365
private final Thread worker;
@@ -145,6 +147,7 @@ public void accept(Element pendingTrace) {
145147

146148
private static final class DumpDrain
147149
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
150+
private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class);
148151
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
149152
private static final int MAX_DUMPED_TRACES = 50;
150153

@@ -154,7 +157,7 @@ private static final class DumpDrain
154157
element -> !(element instanceof PendingTrace);
155158

156159
private volatile List<Element> data = new ArrayList<>();
157-
private int index = 0;
160+
private volatile int index = 0;
158161

159162
@Override
160163
public void accept(Element pendingTrace) {
@@ -166,13 +169,21 @@ public Element get() {
166169
if (index < data.size()) {
167170
return data.get(index++);
168171
}
169-
return null; // Should never reach here or else queue may break according to
170-
// MessagePassingQueue docs
172+
// Should never reach here or else queue may break according to
173+
// MessagePassingQueue docs if we return a null. Return a stand-in
174+
// Element instead.
175+
LOGGER.warn(
176+
SEND_TELEMETRY,
177+
"Index {} is out of bounds for data size {} in DumpDrain.get so returning filler CommandElement to prevent pending trace queue from breaking.",
178+
index,
179+
data.size());
180+
return STAND_IN_ELEMENT;
171181
}
172182

173183
public List<Element> collectTraces() {
174184
List<Element> traces = data;
175185
data = new ArrayList<>();
186+
index = 0;
176187
traces.removeIf(NOT_PENDING_TRACE);
177188
// Storing oldest traces first
178189
traces.sort(TRACE_BY_START_TIME);

‎dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy‎

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -460,36 +460,65 @@ class PendingTraceBufferTest extends DDSpecification {
460460
def parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000)
461461
def child2 = newSpanOf(parent2)
462462

463-
when:
463+
when: "first flare dump with two traces"
464464
parent1.finish()
465465
parent2.finish()
466466
buffer.start()
467-
def entries = buildAndExtractZip()
467+
def entries1 = buildAndExtractZip()
468468

469469
then:
470470
1 * dumpReporter.prepareForFlare()
471471
1 * dumpReporter.addReportToFlare(_)
472472
1 * dumpReporter.cleanupAfterFlare()
473-
entries.size() == 1
474-
def pendingTraceText = entries["pending_traces.txt"] as String
475-
(entries["pending_traces.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific
476-
477-
def parsedTraces = pendingTraceText.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
478-
parsedTraces.size() == 2
479-
parsedTraces[0]["trace_id"] == 1 //Asserting both traces exist
480-
parsedTraces[1]["trace_id"] == 2
481-
parsedTraces[0]["start"] < parsedTraces[1]["start"] //Asserting the dump has the oldest trace first
473+
entries1.size() == 1
474+
def pendingTraceText1 = entries1["pending_traces.txt"] as String
475+
pendingTraceText1.startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific
476+
477+
def parsedTraces1 = pendingTraceText1.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
478+
parsedTraces1.size() == 2
479+
parsedTraces1[0]["trace_id"] == 1 //Asserting both traces exist
480+
parsedTraces1[1]["trace_id"] == 2
481+
parsedTraces1[0]["start"] < parsedTraces1[1]["start"] //Asserting the dump has the oldest trace first
482+
483+
// New pending traces are needed here because generating the first flare takes long enough that the
484+
// earlier pending traces are flushed (within 500ms).
485+
when: "second flare dump with new pending traces"
486+
// Finish the first set of traces
487+
child1.finish()
488+
child2.finish()
489+
// Create new pending traces
490+
def trace3 = factory.create(DDTraceId.from(3))
491+
def parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000)
492+
def child3 = newSpanOf(parent3)
493+
def trace4 = factory.create(DDTraceId.from(4))
494+
def parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000)
495+
def child4 = newSpanOf(parent4)
496+
parent3.finish()
497+
parent4.finish()
498+
def entries2 = buildAndExtractZip()
482499

500+
then:
501+
1 * dumpReporter.prepareForFlare()
502+
1 * dumpReporter.addReportToFlare(_)
503+
1 * dumpReporter.cleanupAfterFlare()
504+
entries2.size() == 1
505+
def pendingTraceText2 = entries2["pending_traces.txt"] as String
506+
def parsedTraces2 = pendingTraceText2.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
507+
parsedTraces2.size() == 2
483508

484509
then:
485-
child1.finish()
486-
child2.finish()
510+
child3.finish()
511+
child4.finish()
487512

488513
then:
489514
trace1.size() == 0
490515
trace1.pendingReferenceCount == 0
491516
trace2.size() == 0
492517
trace2.pendingReferenceCount == 0
518+
trace3.size() == 0
519+
trace3.pendingReferenceCount == 0
520+
trace4.size() == 0
521+
trace4.pendingReferenceCount == 0
493522
}
494523

495524

0 commit comments

Comments
 (0)