Skip to content

Commit 2184348

Browse files
author
Andrew Or
committed
Translate RDD information to dot file
It turns out that the previous scope information is insufficient for producing a valid dot file. In particular, the scope hierarchy was missing, but crucial to differentiate between a parent RDD being in the same encompassing scope and it being in a completely distinct scope. Also, unique scope identifiers are needed to simplify the code significantly. This commit further adds the translation logic in a UI listener that converts RDDInfos to dot files.
1 parent 5143523 commit 2184348

File tree

3 files changed

+166
-10
lines changed

3 files changed

+166
-10
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.rdd
1919

2020
import java.util.Random
21+
import java.util.concurrent.atomic.AtomicInteger
2122

2223
import scala.collection.{mutable, Map}
2324
import scala.collection.mutable.ArrayBuffer
@@ -140,7 +141,7 @@ abstract class RDD[T: ClassTag](
140141
val id: Int = sc.newRddId()
141142

142143
/** A friendly name for this RDD */
143-
@transient var name: String = null
144+
@transient var name: String = Utils.getFormattedClassName(this)
144145

145146
/** Assign a name to this RDD */
146147
def setName(_name: String): this.type = {
@@ -1612,22 +1613,40 @@ abstract class RDD[T: ClassTag](
16121613
*/
16131614
object RDD {
16141615

1616+
private[spark] val SCOPE_NESTING_DELIMITER = ";"
1617+
private[spark] val SCOPE_NAME_DELIMITER = "_"
1618+
1619+
/**
1620+
*
1621+
*/
1622+
private val scopeCounter = new AtomicInteger(0)
1623+
1624+
/**
1625+
* ...
1626+
* This assumes the existing ID looks like this...
1627+
*/
1628+
private def makeScopeId(name: String): String = {
1629+
name.replace(SCOPE_NESTING_DELIMITER, "-")
1630+
.replace(SCOPE_NAME_DELIMITER, "-") +
1631+
SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement
1632+
}
1633+
16151634
/**
16161635
*
16171636
*/
16181637
private[spark] def withScope[T](
16191638
sc: SparkContext,
16201639
allowNesting: Boolean = false)(body: => T): T = {
1621-
val parentMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
1622-
withScope[T](sc, parentMethodName, allowNesting)(body)
1640+
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
1641+
withScope[T](sc, callerMethodName, allowNesting)(body)
16231642
}
16241643

16251644
/**
16261645
*
16271646
*/
16281647
private[spark] def withScope[T](
16291648
sc: SparkContext,
1630-
scope: String,
1649+
name: String,
16311650
allowNesting: Boolean = false)(body: => T): T = {
16321651
// Save the old scope to restore it later
16331652
val scopeKey = SparkContext.RDD_SCOPE_KEY
@@ -1637,7 +1656,9 @@ object RDD {
16371656
try {
16381657
// Set the scope only if the higher level caller allows us to do so
16391658
if (sc.getLocalProperty(noOverrideKey) == null) {
1640-
sc.setLocalProperty(scopeKey, scope)
1659+
val oldScopeId = Option(oldScope).map { _ + SCOPE_NESTING_DELIMITER }.getOrElse("")
1660+
val newScopeId = oldScopeId + makeScopeId(name)
1661+
sc.setLocalProperty(scopeKey, newScopeId)
16411662
}
16421663
// Optionally disallow the child body to override our scope
16431664
if (!allowNesting) {

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ class RDDInfo(
4040

4141
override def toString: String = {
4242
import Utils.bytesToString
43+
val _scope = Option(scope).getOrElse("--")
4344
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
44-
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
45+
"MemorySize: %s; TachyonSize: %s; DiskSize: %s (scope: %s)").format(
4546
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
46-
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
47+
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize), _scope)
4748
}
4849

4950
override def compare(that: RDDInfo): Int = {

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,150 @@
1717

1818
package org.apache.spark.ui.viz
1919

20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
22+
23+
import org.apache.spark.rdd.RDD
2024
import org.apache.spark.scheduler._
25+
import org.apache.spark.storage.RDDInfo
2126

2227
/**
2328
* A SparkListener that...
2429
*/
2530
private[spark] class VisualizationListener extends SparkListener {
31+
private val graphsByStageId = new mutable.HashMap[Int, VizGraph] // stage ID -> viz graph
32+
33+
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
34+
val stageId = stageSubmitted.stageInfo.stageId
35+
val rddInfos = stageSubmitted.stageInfo.rddInfos
36+
37+
println(s"******** STAGE $stageId **********")
38+
rddInfos.foreach(println)
39+
40+
val vizGraph = makeVizGraph(rddInfos)
41+
graphsByStageId(stageId) = vizGraph
42+
43+
println(
44+
s"""
45+
|=====================================================
46+
|${VisualizationListener.makeDotFile(vizGraph)}
47+
|=====================================================
48+
""".stripMargin)
49+
}
50+
51+
/**
52+
*
53+
*/
54+
private def makeVizGraph(rddInfos: Seq[RDDInfo]): VizGraph = {
55+
val edges = new mutable.HashSet[VizEdge]
56+
val nodes = new mutable.HashMap[Int, VizNode]
57+
val scopes = new mutable.HashMap[String, VizScope] // scope ID -> viz scope
58+
59+
// Entities that are not part of any scopes
60+
val rootNodes = new ArrayBuffer[VizNode]
61+
val rootScopes = new mutable.HashSet[VizScope]
62+
63+
// Populate nodes, edges, and scopes
64+
rddInfos.foreach { rdd =>
65+
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))
66+
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
67+
68+
if (rdd.scope == null) {
69+
// There is no encompassing scope, so this is a root node
70+
rootNodes += node
71+
} else {
72+
// Attach children scopes and nodes to each scope
73+
var previousScope: VizScope = null
74+
val scopeIt = rdd.scope.split(RDD.SCOPE_NESTING_DELIMITER).iterator
75+
while (scopeIt.hasNext) {
76+
val scopeId = scopeIt.next()
77+
val scope = scopes.getOrElseUpdate(scopeId, new VizScope(scopeId))
78+
scope.attachChildNode(node)
79+
// RDD scopes are hierarchical, with the outermost scopes ordered first
80+
// If there is not a previous scope, then this must be a root scope
81+
if (previousScope == null) {
82+
rootScopes += scope
83+
} else {
84+
// Otherwise, attach this scope to its parent
85+
previousScope.attachChildScope(scope)
86+
}
87+
previousScope = scope
88+
}
89+
}
90+
}
91+
92+
// Remove any edges with nodes belonging to other stages so we do not have orphaned nodes
93+
edges.retain { case VizEdge(f, t) => nodes.contains(f) && nodes.contains(t) }
94+
95+
new VizGraph(edges.toSeq, rootNodes, rootScopes.toSeq)
96+
}
97+
}
2698

27-
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
28-
synchronized {
29-
stageSubmitted.stageInfo.rddInfos
99+
private object VisualizationListener {
100+
101+
/**
102+
*
103+
*/
104+
def makeDotFile(graph: VizGraph): String = {
105+
val dotFile = new StringBuilder
106+
dotFile.append("digraph G {\n")
107+
//
108+
graph.rootScopes.foreach { scope =>
109+
dotFile.append(makeDotSubgraph(scope, " "))
110+
}
111+
//
112+
graph.rootNodes.foreach { node =>
113+
dotFile.append(" " + makeDotNode(node) + "\n")
30114
}
115+
//
116+
graph.edges.foreach { edge =>
117+
dotFile.append(" " + edge.fromId + "->" + edge.toId + "\n")
118+
}
119+
dotFile.append("}")
120+
dotFile.toString()
121+
}
122+
123+
/**
124+
*
125+
*/
126+
private def makeDotNode(node: VizNode): String = {
127+
node.id + " [label = \"" + node.name + "\"]"
128+
}
129+
130+
/**
131+
*
132+
*/
133+
private def makeDotSubgraph(scope: VizScope, indent: String): String = {
134+
val subgraph = new StringBuilder
135+
subgraph.append(indent + "subgraph cluster" + scope.id + "{\n")
136+
subgraph.append(indent + " label = \"" + scope.name + "\"\n")
137+
scope.childrenNodes.foreach { node =>
138+
subgraph.append(indent + " " + makeDotNode(node) + "\n")
139+
}
140+
scope.childrenScopes.foreach { cscope =>
141+
subgraph.append(makeDotSubgraph(cscope, indent + " "))
142+
}
143+
subgraph.append(indent + "}\n")
144+
subgraph.toString()
31145
}
32146
}
147+
148+
private case class VizNode(id: Int, name: String)
149+
private case class VizEdge(fromId: Int, toId: Int)
150+
151+
private class VizScope(val id: String) {
152+
private val _childrenNodes = new ArrayBuffer[VizNode]
153+
private val _childrenScopes = new ArrayBuffer[VizScope]
154+
val name: String = id.split(RDD.SCOPE_NAME_DELIMITER).head
155+
156+
def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq
157+
def childrenScopes: Seq[VizScope] = _childrenScopes.iterator.toSeq
158+
159+
def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode }
160+
def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope }
161+
}
162+
163+
private case class VizGraph(
164+
edges: Seq[VizEdge],
165+
rootNodes: Seq[VizNode],
166+
rootScopes: Seq[VizScope])

0 commit comments

Comments
 (0)