Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
be7ada4
Extract Spark Plan product with keys for Scala 2.13
charlesmyu Oct 10, 2025
9c19e11
Extract Spark Plan product values for Spark 2.12
charlesmyu Oct 10, 2025
3f9c26b
Update tests for meta field in Spark SQL plans
charlesmyu Oct 14, 2025
94f3139
Remove unused logic to parse children, enrich product parsing to supp…
charlesmyu Oct 15, 2025
beb4d5f
Update tests to assert on meta values
charlesmyu Oct 16, 2025
750c68f
Use Abstract class for common functions
charlesmyu Oct 16, 2025
0279fff
Use Jackson JSON parser instead of rolling own parsing
charlesmyu Oct 16, 2025
50fa41a
Refactor AbstractSparkPlanUtils to only require key generation on impl
charlesmyu Oct 17, 2025
51835fa
Default to returning null if class not recognized, limit recursion de…
charlesmyu Oct 20, 2025
c68b356
Improve testing scheme for Spark32 on Scala 212 with unknown keys
charlesmyu Oct 20, 2025
8bf8488
Improve method & class naming, reuse ObjectMapper from listener
charlesmyu Oct 21, 2025
55b917b
Gate Spark Plan parsing with flag
charlesmyu Oct 21, 2025
047880a
Match classes by string comparison, add negative cache
charlesmyu Oct 21, 2025
3619b77
Add unit tests for AbstractSparkPlanSerializer
charlesmyu Oct 23, 2025
53918a3
Make ObjectMapper protected on AbstractDatadogSparkListener instead o…
charlesmyu Oct 23, 2025
6e68c69
Specify correct helper class names
charlesmyu Oct 23, 2025
5483ba8
Add dd.data.jobs.experimental_features.enabled FF
charlesmyu Oct 23, 2025
e4973fc
Remove knownMatchingTypes override from version-specific impls
charlesmyu Oct 24, 2025
5527ad0
Catch NullPointerException for getDeclaredMethod calls
charlesmyu Oct 24, 2025
1f31add
Adjust more gates to match classes using string comparison
charlesmyu Oct 24, 2025
18e51d5
Revert "Catch NullPointerException for getDeclaredMethod calls"
charlesmyu Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashMap;

@AutoService(InstrumenterModule.class)
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanSerializer",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
Expand All @@ -27,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark212PlanSerializer"
};
}

Expand All @@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark212Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark212Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -78,4 +91,26 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking loudly: would it make sense to hide this behind feature flague in config? Reasons to do so:

  • we can let it bake on dd internal jobs for some time,
  • users can turn it off in case of unexpected issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, added! 55b917b

@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0
&& (Config.get().isDataJobsParseSparkPlanEnabled()
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
HashMap<String, String> args = new HashMap<>();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
args.$plus$plus(
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))),
planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark212PlanSerializer extends AbstractSparkPlanSerializer {
@Override
public String getKey(int idx, TreeNode node) {
return String.format("_dd.unknown_key.%d", idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashMap;

@AutoService(InstrumenterModule.class)
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanSerializer",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
Expand All @@ -27,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark213PlanSerializer"
};
}

Expand All @@ -40,6 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark213Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark213Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -79,4 +92,25 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0
&& (Config.get().isDataJobsParseSparkPlanEnabled()
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
HashMap.from(
JavaConverters.asScala(planUtils.extractFormattedProduct(plan)).toList()),
planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark213PlanSerializer extends AbstractSparkPlanSerializer {
@Override
public String getKey(int idx, TreeNode node) {
return node.productElementName(idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
*/
public abstract class AbstractDatadogSparkListener extends SparkListener {
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
protected static final ObjectMapper objectMapper = new ObjectMapper();
public static volatile AbstractDatadogSparkListener listener = null;

public static volatile boolean finishTraceOnApplicationEnd = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public String[] knownMatchingTypes() {
"org.apache.spark.deploy.yarn.ApplicationMaster",
"org.apache.spark.util.Utils",
"org.apache.spark.util.SparkClassUtils",
"org.apache.spark.scheduler.LiveListenerBus"
"org.apache.spark.scheduler.LiveListenerBus",
"org.apache.spark.sql.execution.SparkPlanInfo$"
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.trees.TreeNode;
import org.apache.spark.sql.execution.SparkPlan;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters;

// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
public abstract class AbstractSparkPlanSerializer {
private final int MAX_DEPTH = 4;
private final int MAX_LENGTH = 50;
private final ObjectMapper mapper = AbstractDatadogSparkListener.objectMapper;

private final String SPARK_PKG_NAME = "org.apache.spark";

private final Set<String> SAFE_PARSE_TRAVERSE =
new HashSet<>(Arrays.asList(SPARK_PKG_NAME + ".sql.catalyst.plans.physical.Partitioning"));
private final Set<String> SAFE_PARSE_STRING =
new HashSet<>(
Arrays.asList(
SPARK_PKG_NAME + ".Partitioner", // not a product or TreeNode
SPARK_PKG_NAME
+ ".sql.catalyst.expressions.Attribute", // avoid data type added by simpleString
SPARK_PKG_NAME + ".sql.catalyst.optimizer.BuildSide", // enum (v3+)
SPARK_PKG_NAME + ".sql.catalyst.plans.JoinType", // enum
SPARK_PKG_NAME
+ ".sql.catalyst.plans.physical.BroadcastMode", // not a product or TreeNode
SPARK_PKG_NAME
+ ".sql.execution.ShufflePartitionSpec", // not a product or TreeNode (v3+)
SPARK_PKG_NAME + ".sql.execution.exchange.ShuffleOrigin" // enum (v3+)
));

// Add class here if we want to break inheritance and interface traversal early when we see
// this class. In other words, we explicitly do not match these classes or their parents
private final Set<String> NEGATIVE_CACHE =
new HashSet<>(
Arrays.asList(
"java.io.Serializable",
"java.lang.Object",
"scala.Equals",
"scala.Product",
SPARK_PKG_NAME + ".sql.catalyst.InternalRow",
SPARK_PKG_NAME + ".sql.catalyst.expressions.UnaryExpression",
SPARK_PKG_NAME + ".sql.catalyst.expressions.Unevaluable",
SPARK_PKG_NAME + ".sql.catalyst.trees.TreeNode"));

public abstract String getKey(int idx, TreeNode node);

public Map<String, String> extractFormattedProduct(SparkPlan plan) {
HashMap<String, String> result = new HashMap<>();
safeParseTreeNode(plan, 0)
.forEach(
(key, value) -> {
result.put(key, writeObjectToString(value));
});
return result;
}

protected Map<String, Object> safeParseTreeNode(TreeNode node, int depth) {
HashMap<String, Object> args = new HashMap<>();
HashMap<String, String> unparsed = new HashMap<>();

int i = 0;
for (Iterator<Object> it = JavaConverters.asJavaIterator(node.productIterator());
it.hasNext(); ) {
Object obj = it.next();

Object val = safeParseObjectToJson(obj, depth);
if (val != null) {
args.put(getKey(i, node), val);
} else {
unparsed.put(getKey(i, node), obj.getClass().getName());
}

i++;
}

if (unparsed.size() > 0) {
// For now, place what we can't parse here with the types so we're aware of them
args.put("_dd.unparsed", unparsed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to create separate SafeSparkPlanSerializer class? Seems like this is what this all is about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, do we mean to use SafeSparkPlanSerializer to store the list of parsed and unparsed orgs to then serialize? Or for it to handle the actual serialization (e.g. parsePlanProduct) itself?

This did make me realize some of the function names are outdated, though. Updated them (extractPlanProduct -> safeParseTreeNode, parsePlanProduct -> safeParseObjectToJson)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified over a quick call; renamed class to AbstractSparkPlanSerializer as well to address this!

8bf8488

}
return args;
}

// Should only call on final values being written to `meta`
protected String writeObjectToString(Object value) {
try {
return mapper.writeValueAsString(value);
} catch (IOException e) {
return null;
}
}

protected Object safeParseObjectToJson(Object value, int depth) {
// This function MUST not arbitrarily serialize the object as we can't be sure what it is.
// A null return indicates object is unserializable, otherwise it should really only return
// valid JSON types (Array, Map, String, Boolean, Number, null)

if (value == null) {
return "null";
} else if (value instanceof String || value instanceof Boolean || value instanceof Number) {
return value;
} else if (value instanceof Option) {
return safeParseObjectToJson(((Option) value).getOrElse(() -> "none"), depth);
} else if (value instanceof QueryPlan) {
// don't duplicate child nodes
return null;
} else if (value instanceof Iterable && depth < MAX_DEPTH) {
ArrayList<Object> list = new ArrayList<>();
for (Object item : JavaConverters.asJavaIterable((Iterable) value)) {
Object res = safeParseObjectToJson(item, depth + 1);
if (list.size() < MAX_LENGTH && res != null) {
list.add(res);
}
}
return list;
} else if (instanceOf(value, SAFE_PARSE_TRAVERSE, NEGATIVE_CACHE)) {
if (value instanceof TreeNode && depth < MAX_DEPTH) {
HashMap<String, Object> inner = new HashMap<>();
inner.put(
value.getClass().getSimpleName(), safeParseTreeNode(((TreeNode) value), depth + 1));
return inner;
} else {
return value.toString();
}
} else if (instanceOf(value, SAFE_PARSE_STRING, NEGATIVE_CACHE)) {
return value.toString();
} else if (value instanceof TreeNode) {
// fallback case, leave at bottom
return getSimpleString((TreeNode) value);
}

return null;
}

private String getSimpleString(TreeNode value) {
try {
// in Spark v3+, the signature of `simpleString` includes an int parameter for `maxFields`
return TreeNode.class
.getDeclaredMethod("simpleString", new Class[] {int.class})
.invoke(value, MAX_LENGTH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls make sure this doesn't throw NullPointerException in case getDeclaredMethod returns null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's true! Based on the signature of getDeclaredMethod it looks like we should expect NoSuchMethodException in that case:

public Method getDeclaredMethod(String name, Class<?>... parameterTypes) throws NoSuchMethodException, SecurityException

I've added NullPointerException to the catch just in case, though. 5527ad0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just kidding, the spotbugs job did not like that - reverted that change. I'm fairly confident based on the signature & impl that we should only get NoSuchMethodException, though, and not NullPointerException. Let me know if we'd still like to do a more explicit null check.

18e51d5

.toString();
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException exception) {
try {
// Attempt the Spark v2 `simpleString` signature
return TreeNode.class.getDeclaredMethod("simpleString").invoke(value).toString();
} catch (NoSuchMethodException
| IllegalAccessException
| InvocationTargetException innerException) {
}

return null;
}
}

// Matches a class to a set of expected classes. Returns true if the class or any
// of the interfaces or parent classes it implements matches a class in `expectedClasses`.
// Will not attempt to match any classes identified in `negativeCache` on traversal
private boolean instanceOf(Object value, Set<String> expectedClasses, Set<String> negativeCache) {
if (classOrInterfaceInstanceOf(value.getClass(), expectedClasses, negativeCache)) {
return true;
}

// Traverse up inheritance tree to check for matches
int lim = 0;
Class currClass = value.getClass();
while (currClass.getSuperclass() != null && lim < MAX_DEPTH) {
currClass = currClass.getSuperclass();
if (negativeCache.contains(currClass.getName())) {
// don't traverse known paths
break;
}
if (classOrInterfaceInstanceOf(currClass, expectedClasses, negativeCache)) {
return true;
}
lim += 1;
}

return false;
}

// Matches a class to a set of expected classes. Returns true if the class or any
// of the interfaces it implements matches a class in `expectedClasses`. Will not
// attempt to match any classes identified in `negativeCache`.
private boolean classOrInterfaceInstanceOf(
Class cls, Set<String> expectedClasses, Set<String> negativeCache) {
// Match on strings to avoid class loading errors
if (expectedClasses.contains(cls.getName())) {
return true;
}

// Check interfaces as well
for (Class interfaceClass : cls.getInterfaces()) {
if (!negativeCache.contains(interfaceClass.getName())
&& classOrInterfaceInstanceOf(interfaceClass, expectedClasses, negativeCache)) {
return true;
}
}

return false;
}
}
Loading