diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 1b0d4692d9cd0..154f1366524b5 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -208,6 +208,7 @@ function renderDagVizForJob(svgContainer) { var metadata = d3.select(this); var dot = metadata.select(".dot-file").text(); var stageId = metadata.attr("stage-id"); + var attemptId = metadata.attr("attemptId"); var containerId = VizConstants.graphPrefix + stageId; var isSkipped = metadata.attr("skipped") == "true"; var container; @@ -219,7 +220,6 @@ function renderDagVizForJob(svgContainer) { } else { // Link each graph to the corresponding stage page (TODO: handle stage attempts) // Use the link from the stage table so it also works for the history server - var attemptId = 0 var stageLink = d3.select("#stage-" + stageId + "-" + attemptId) .select("a.name-link") .attr("href"); diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index aa2548a55412f..65b27e660b5c8 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -386,7 +386,8 @@ private[spark] object UIUtils extends Logging { graphs.map { g => val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "") val skipped = g.rootCluster.name.contains("skipped").toString -
+ val attemptId = RDDOperationGraph.stageIdAttemptedMap.getOrElse(stageId, "0") +
{RDDOperationGraph.makeDotFile(g)}
{ g.incomingEdges.map { e =>
{e.fromId},{e.toId}
} } { g.outgoingEdges.map { e =>
{e.fromId},{e.toId}
} } diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index bb6b663f1ead3..722685c49ebc3 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -77,6 +77,7 @@ private[ui] class RDDOperationCluster(val id: String, private var _name: String) private[ui] object RDDOperationGraph extends Logging { val STAGE_CLUSTER_PREFIX = "stage_" + val stageIdAttemptedMap = new mutable.HashMap[String, String]() /** * Construct a RDDOperationGraph for a given stage. @@ -160,6 +161,10 @@ private[ui] object RDDOperationGraph extends Logging { RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster) } + def getStageInfo(stage: StageInfo): Unit = { + stageIdAttemptedMap.put(stage.stageId.toString, stage.attemptId.toString) + } + /** * Generate the content of a dot file that describes the specified graph. * diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index bcae56e2f114c..b6a72717f88ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -89,6 +89,14 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen trimJobsIfNecessary() } + /** Keep track of stages that have submitted. */ + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { + val stageId = stageSubmitted.stageInfo.stageId + if (stageIdToJobId.contains(stageId)) { + RDDOperationGraph.getStageInfo(stageSubmitted.stageInfo) + } + } + /** Keep track of stages that have completed. */ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { val stageId = stageCompleted.stageInfo.stageId