Skip to content

Commit 429e9e1

Browse files
author
Andrew Or
committed
Display cached RDDs on the viz
1 parent b1f0fd1 commit 429e9e1

File tree

5 files changed

+33
-6
lines changed

5 files changed

+33
-6
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353

5454
var VizConstants = {
5555
rddColor: "#444444",
56+
rddCachedColor: "#7DDD00",
57+
rddOperationColor: "#AADFFF",
5658
stageColor: "#FFDDEE",
57-
operationScopeColor: "#AADFFF",
5859
clusterLabelColor: "#888888",
5960
edgeColor: "#444444",
6061
edgeWidth: "1.5px",
@@ -125,6 +126,12 @@ function renderDagViz(forJob) {
125126
renderDagVizForStage(svg);
126127
}
127128

129+
// Find cached RDDs
130+
metadataContainer().selectAll(".cached-rdd").each(function(v) {
131+
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
132+
graphContainer().selectAll("#" + nodeId).classed("cached", true);
133+
});
134+
128135
// Set the appropriate SVG dimensions to ensure that all elements are displayed
129136
var boundingBox = svg.node().getBBox();
130137
svg.style("width", (boundingBox.width + VizConstants.svgMarginX) + "px");
@@ -240,7 +247,7 @@ function renderDot(dot, container) {
240247
function styleDagViz(forJob) {
241248
graphContainer().selectAll("svg g.cluster rect")
242249
.style("fill", "white")
243-
.style("stroke", VizConstants.operationScopeColor)
250+
.style("stroke", VizConstants.rddOperationColor)
244251
.style("stroke-width", "4px")
245252
.style("stroke-opacity", "0.5");
246253
graphContainer().selectAll("svg g.cluster text")
@@ -279,6 +286,9 @@ function styleDagViz(forJob) {
279286
function styleDagVizForJob() {
280287
graphContainer().selectAll("svg g.node circle")
281288
.style("fill", VizConstants.rddColor);
289+
// TODO: add a legend to explain what a highlighted dot means
290+
graphContainer().selectAll("svg g.cached circle")
291+
.style("fill", VizConstants.rddCachedColor);
282292
graphContainer().selectAll("svg g#cross-stage-edges path")
283293
.style("fill", "none");
284294
}
@@ -289,6 +299,9 @@ function styleDagVizForStage() {
289299
.style("fill", "none")
290300
.style("stroke", VizConstants.rddColor)
291301
.style("stroke-width", "2px");
302+
// TODO: add a legend to explain what a highlighted RDD means
303+
graphContainer().selectAll("svg g.cached rect")
304+
.style("stroke", VizConstants.rddCachedColor);
292305
graphContainer().selectAll("svg g.node g.label text tspan")
293306
.style("fill", VizConstants.rddColor);
294307
}

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,11 @@ private[spark] object UIUtils extends Logging {
362362
<div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
363363
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
364364
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
365+
{
366+
g.rootCluster.getAllNodes.filter(_.cached).map { n =>
367+
<div class="cached-rdd">{n.id}</div>
368+
}
369+
}
365370
</div>
366371
}
367372
}

core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
179179

180180
<span class="expand-application-timeline">
181181
<span class="expand-application-timeline-arrow arrow-closed"></span>
182-
<strong>Event Timeline</strong>
182+
<strong>Event timeline</strong>
183183
</span> ++
184184
<div id="application-timeline" class="collapsed">
185185
<div class="control-panel">

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
161161

162162
<span class="expand-job-timeline">
163163
<span class="expand-job-timeline-arrow arrow-closed"></span>
164-
<strong>Event Timeline</strong>
164+
<strong>Event timeline</strong>
165165
</span> ++
166166
<div id="job-timeline" class="collapsed">
167167
<div class="control-panel">

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
2222

2323
import org.apache.spark.Logging
2424
import org.apache.spark.scheduler.StageInfo
25+
import org.apache.spark.storage.StorageLevel
2526

2627
/**
2728
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -37,7 +38,7 @@ private[ui] case class RDDOperationGraph(
3738
rootCluster: RDDOperationCluster)
3839

3940
/** A node in an RDDOperationGraph. This represents an RDD. */
40-
private[ui] case class RDDOperationNode(id: Int, name: String)
41+
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
4142

4243
/**
4344
* A directed edge connecting two nodes in an RDDOperationGraph.
@@ -61,6 +62,11 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {
6162
def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
6263
_childrenClusters += childCluster
6364
}
65+
66+
/** Return all the nodes container in this cluster, including ones nested in other clusters. */
67+
def getAllNodes: Seq[RDDOperationNode] = {
68+
_childrenNodes ++ _childrenClusters.flatMap(_.childrenNodes)
69+
}
6470
}
6571

6672
private[ui] object RDDOperationGraph extends Logging {
@@ -90,7 +96,10 @@ private[ui] object RDDOperationGraph extends Logging {
9096
// Find nodes, edges, and operation scopes that belong to this stage
9197
stage.rddInfos.foreach { rdd =>
9298
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
93-
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name))
99+
100+
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
101+
val node = nodes.getOrElseUpdate(
102+
rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))
94103

95104
if (rdd.scope == null) {
96105
// This RDD has no encompassing scope, so we put it directly in the root cluster

0 commit comments

Comments
 (0)