Skip to content

Commit c3e99e1

Browse files
committed
take care of sparkconf/ol lifecycle
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent b936442 commit c3e99e1

File tree

2 files changed

+44
-7
lines changed

2 files changed

+44
-7
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,15 @@ static void setupSparkConf(SparkConf sparkConf) {
172172
"_dd.trace_id:"
173173
+ listener.applicationSpan.context().getTraceId().toString()
174174
+ ";_dd.ol_intake.emit_spans:false");
175+
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
176+
log.error("Set Spark conf: Key: " + tuple._1 + ", Value: " + tuple._2);
177+
}
175178
}
176179

177180
public void setupOpenLineage() {
178-
log.debug("Setting up OpenLineage-Datadog integration");
181+
log.error("Setting up OpenLineage-Datadog integration");
179182
if (openLineageSparkListener != null) {
183+
log.error("No init needed");
180184
setupSparkConf(openLineageSparkConf);
181185
return;
182186
}
@@ -240,6 +244,8 @@ private void initApplicationSpanIfNotInitialized() {
240244
return;
241245
}
242246

247+
log.error("Starting tracer application span.");
248+
243249
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
244250

245251
if (applicationStart != null) {
@@ -253,8 +259,6 @@ private void initApplicationSpanIfNotInitialized() {
253259
}
254260
}
255261

256-
notifyOl(x -> this.openLineageSparkListener.onApplicationStart(x), applicationStart);
257-
258262
captureApplicationParameters(builder);
259263
captureOpenlineageContextIfPresent(builder);
260264

@@ -263,6 +267,7 @@ private void initApplicationSpanIfNotInitialized() {
263267
applicationSpan.setMeasured(true);
264268
// We need to set it up after we create application span to have correlation.
265269
setupOpenLineage();
270+
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
266271
}
267272

268273
private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) {
@@ -774,14 +779,15 @@ public void onOtherEvent(SparkListenerEvent event) {
774779

775780
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
776781
if (isRunningOnDatabricks || isStreamingJob) {
777-
log.debug("Not emitting event when running on databricks or on streaming jobs");
782+
log.error("Not emitting event when running on databricks or on streaming jobs");
778783
return;
779784
}
785+
initApplicationSpanIfNotInitialized();
780786
if (openLineageSparkListener != null) {
781-
log.debug("Notifying with event `{}`", event.getClass().getCanonicalName());
787+
log.error("Notifying with event `{}`", event.getClass().getCanonicalName());
782788
ol.accept(event);
783789
} else {
784-
log.debug("OpenLineageSparkListener is null");
790+
log.error("OpenLineageSparkListener is null");
785791
}
786792
}
787793

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.Config;
1112
import net.bytebuddy.asm.Advice;
1213
import org.apache.spark.deploy.SparkSubmitArguments;
14+
import org.apache.spark.scheduler.SparkListenerInterface;
15+
import org.slf4j.LoggerFactory;
1316

1417
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1518
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
@@ -29,7 +32,8 @@ public String[] knownMatchingTypes() {
2932
"org.apache.spark.SparkContext",
3033
"org.apache.spark.deploy.SparkSubmit",
3134
"org.apache.spark.deploy.yarn.ApplicationMaster",
32-
"org.apache.spark.util.SparkClassUtils"
35+
"org.apache.spark.util.SparkClassUtils",
36+
"org.apache.spark.scheduler.LiveListenerBus"
3337
};
3438
}
3539

@@ -56,6 +60,15 @@ public void methodAdvice(MethodTransformer transformer) {
5660
.and(named("finish"))
5761
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
5862
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
63+
64+
// LiveListenerBus class is used when running in a YARN cluster
65+
transformer.applyAdvice(
66+
isMethod()
67+
.and(named("addToSharedQueue"))
68+
// .and(takesArgument(0,
69+
// named("org.apache.spark.scheduler.SparkListenerInterface")))
70+
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
71+
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
5972
}
6073

6174
public static class PrepareSubmitEnvAdvice {
@@ -101,4 +114,22 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
101114
}
102115
}
103116
}
117+
118+
public static class LiveListenerBusAdvice {
119+
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120+
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
121+
if (listener == null || listener.getClass().getCanonicalName() == null) {
122+
return false;
123+
}
124+
if (listener
125+
.getClass()
126+
.getCanonicalName()
127+
.equals("io.openlineage.spark.agent.OpenLineageSparkListener")) {
128+
LoggerFactory.getLogger(Config.class)
129+
.debug("Detected OL listener, skipping initialization");
130+
return true;
131+
}
132+
return false;
133+
}
134+
}
104135
}

0 commit comments

Comments
 (0)