Skip to content

Commit 46ebe3b

Browse files
committed
set OL if detected
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent c79bf57 commit 46ebe3b

File tree

16 files changed

+414
-23
lines changed

16 files changed

+414
-23
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ public static void start(
250250
setSystemPropertyDefault(
251251
propertyNameToSystemPropertyName("integration.kafka.enabled"), "true");
252252

253+
if (Config.get().isDataJobsOpenLineageEnabled()) {
254+
setSystemPropertyDefault(
255+
propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true");
256+
}
257+
253258
String javaCommand = System.getProperty("sun.java.command");
254259
String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern();
255260
if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) {

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

1114
@AutoService(InstrumenterModule.class)
1215
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +20,7 @@ public String[] helperClassNames() {
1720
packageName + ".DatabricksParentContext",
1821
packageName + ".OpenlineageParentContext",
1922
packageName + ".DatadogSpark212Listener",
23+
packageName + ".PredeterminedTraceIdContext",
2024
packageName + ".RemoveEldestHashMap",
2125
packageName + ".SparkAggregatedTaskMetrics",
2226
packageName + ".SparkConfAllowList",
@@ -41,6 +45,38 @@ public void methodAdvice(MethodTransformer transformer) {
4145
public static class InjectListener {
4246
@Advice.OnMethodEnter(suppress = Throwable.class)
4347
public static void enter(@Advice.This SparkContext sparkContext) {
48+
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
49+
log.debug(
50+
"AbstractDatadogSparkListener classloader is: ({}) {}",
51+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
52+
AbstractDatadogSparkListener.class.getClassLoader());
53+
54+
if (Config.get().isDataJobsOpenLineageEnabled()
55+
&& AbstractDatadogSparkListener.classIsLoadable(
56+
"io.openlineage.spark.agent.OpenLineageSparkListener")
57+
&& AbstractDatadogSparkListener.classIsLoadable(
58+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
59+
if (!sparkContext.conf().contains("spark.extraListeners")) {
60+
log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage");
61+
sparkContext
62+
.conf()
63+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
64+
} else {
65+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
66+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
67+
log.debug(
68+
"spark.extraListeners does contain listeners {}. Adding OpenLineage",
69+
extraListeners);
70+
sparkContext
71+
.conf()
72+
.set(
73+
"spark.extraListeners",
74+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
75+
}
76+
}
77+
}
78+
79+
// We want to add the Datadog listener as the first listener
4480
AbstractDatadogSparkListener.listener =
4581
new DatadogSpark212Listener(
4682
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

1114
@AutoService(InstrumenterModule.class)
1215
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +20,7 @@ public String[] helperClassNames() {
1720
packageName + ".DatabricksParentContext",
1821
packageName + ".OpenlineageParentContext",
1922
packageName + ".DatadogSpark213Listener",
23+
packageName + ".PredeterminedTraceIdContext",
2024
packageName + ".RemoveEldestHashMap",
2125
packageName + ".SparkAggregatedTaskMetrics",
2226
packageName + ".SparkConfAllowList",
@@ -41,6 +45,39 @@ public void methodAdvice(MethodTransformer transformer) {
4145
public static class InjectListener {
4246
@Advice.OnMethodEnter(suppress = Throwable.class)
4347
public static void enter(@Advice.This SparkContext sparkContext) {
48+
// checking whether OpenLineage integration is enabled, available and that it supports tags
49+
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
50+
log.debug(
51+
"AbstractDatadogSparkListener classloader is: ({}) {}",
52+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
53+
AbstractDatadogSparkListener.class.getClassLoader());
54+
55+
if (Config.get().isDataJobsOpenLineageEnabled()
56+
&& AbstractDatadogSparkListener.classIsLoadable(
57+
"io.openlineage.spark.agent.OpenLineageSparkListener")
58+
&& AbstractDatadogSparkListener.classIsLoadable(
59+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
60+
if (!sparkContext.conf().contains("spark.extraListeners")) {
61+
log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage");
62+
sparkContext
63+
.conf()
64+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
65+
} else {
66+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
67+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
68+
log.debug(
69+
"spark.extraListeners does contain listeners {}. Adding OpenLineage",
70+
extraListeners);
71+
sparkContext
72+
.conf()
73+
.set(
74+
"spark.extraListeners",
75+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
76+
}
77+
}
78+
}
79+
80+
// We want to add the Datadog listener as the first listener
4481
AbstractDatadogSparkListener.listener =
4582
new DatadogSpark213Listener(
4683
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());

0 commit comments

Comments
 (0)