Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ function renderDagVizForJob(svgContainer) {
var metadata = d3.select(this);
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var attemptId = metadata.attr("attemptId");
var containerId = VizConstants.graphPrefix + stageId;
var isSkipped = metadata.attr("skipped") == "true";
var container;
Expand All @@ -219,7 +220,6 @@ function renderDagVizForJob(svgContainer) {
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a.name-link")
.attr("href");
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ private[spark] object UIUtils extends Logging {
graphs.map { g =>
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "")
val skipped = g.rootCluster.name.contains("skipped").toString
<div class="stage-metadata" stage-id={stageId} skipped={skipped}>
val attemptId = RDDOperationGraph.stageIdAttemptedMap.getOrElse(stageId, "0")
<div class="stage-metadata" stage-id={stageId} skipped={skipped} attemptId={attemptId}>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private[ui] class RDDOperationCluster(val id: String, private var _name: String)
private[ui] object RDDOperationGraph extends Logging {

val STAGE_CLUSTER_PREFIX = "stage_"
val stageIdAttemptedMap = new mutable.HashMap[String, String]()

/**
* Construct a RDDOperationGraph for a given stage.
Expand Down Expand Up @@ -160,6 +161,10 @@ private[ui] object RDDOperationGraph extends Logging {
RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
}

def getStageInfo(stage: StageInfo): Unit = {
stageIdAttemptedMap.put(stage.stageId.toString, stage.attemptId.toString)
}

/**
* Generate the content of a dot file that describes the specified graph.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
trimJobsIfNecessary()
}

/** Keep track of stages that have submitted. */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stageId = stageSubmitted.stageInfo.stageId
if (stageIdToJobId.contains(stageId)) {
RDDOperationGraph.getStageInfo(stageSubmitted.stageInfo)
}
}

/** Keep track of stages that have completed. */
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stageId = stageCompleted.stageInfo.stageId
Expand Down