Skip to content

Commit f22f337

Browse files
author
Andrew Or
committed
First working implementation of visualization with vis.js
1 parent 2184348 commit f22f337

File tree

9 files changed

+1359
-19
lines changed

9 files changed

+1359
-19
lines changed

core/src/main/resources/org/apache/spark/ui/static/viz.js

Lines changed: 1302 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,18 @@ private[spark] object UIUtils extends Logging {
167167
<script src={prependBaseUri("/static/additional-metrics.js")}></script>
168168
}
169169

170+
def vizHeaderNodes: Seq[Node] = {
171+
<script src={prependBaseUri("/static/viz.js")}></script>
172+
}
173+
170174
/** Returns a spark page with correctly formatted headers */
171175
def headerSparkPage(
172176
title: String,
173177
content: => Seq[Node],
174178
activeTab: SparkUITab,
175179
refreshInterval: Option[Int] = None,
176-
helpText: Option[String] = None): Seq[Node] = {
180+
helpText: Option[String] = None,
181+
showVisualization: Boolean = false): Seq[Node] = {
177182

178183
val appName = activeTab.appName
179184
val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..."
@@ -191,6 +196,7 @@ private[spark] object UIUtils extends Logging {
191196
<html>
192197
<head>
193198
{commonHeaderNodes}
199+
{if (showVisualization) vizHeaderNodes else Seq.empty}
194200
<title>{appName} - {title}</title>
195201
</head>
196202
<body>

core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
2727
/** Page showing list of all ongoing and recently finished stages and pools */
2828
private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
2929
private val sc = parent.sc
30-
private val listener = parent.listener
30+
private val listener = parent.progressListener
3131
private def isFairScheduler = parent.isFairScheduler
3232

3333
def render(request: HttpServletRequest): Seq[Node] = {
@@ -42,18 +42,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
4242

4343
val activeStagesTable =
4444
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
45-
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
45+
parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
4646
killEnabled = parent.killEnabled)
4747
val pendingStagesTable =
4848
new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
49-
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
49+
parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
5050
killEnabled = false)
5151
val completedStagesTable =
5252
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
53-
parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
53+
parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
5454
val failedStagesTable =
5555
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
56-
parent.listener, isFairScheduler = parent.isFairScheduler)
56+
parent.progressListener, isFairScheduler = parent.isFairScheduler)
5757

5858
// For now, pool information is only accessible in live UIs
5959
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])

core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
2626

2727
/** Stage summary grouped by executors. */
2828
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) {
29-
private val listener = parent.listener
29+
private val listener = parent.progressListener
3030

3131
def toNodeSeq: Seq[Node] = {
3232
listener.synchronized {

core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
2727
/** Page showing specific pool details */
2828
private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
2929
private val sc = parent.sc
30-
private val listener = parent.listener
30+
private val listener = parent.progressListener
3131

3232
def render(request: HttpServletRequest): Seq[Node] = {
3333
listener.synchronized {
@@ -40,7 +40,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
4040
case None => Seq[StageInfo]()
4141
}
4242
val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
43-
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
43+
parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
4444
killEnabled = parent.killEnabled)
4545

4646
// For now, pool information is only accessible in live UIs

core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.ui.UIUtils
2525

2626
/** Table showing list of pools */
2727
private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
28-
private val listener = parent.listener
28+
private val listener = parent.progressListener
2929

3030
def toNodeSeq: Seq[Node] = {
3131
listener.synchronized {

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,33 @@ import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
3232

3333
/** Page showing statistics and task list for a given stage */
3434
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
35-
private val listener = parent.listener
35+
private val progressListener = parent.progressListener
36+
private val vizListener = parent.vizListener
37+
38+
private def renderViz(stageId: Int): Seq[Node] = {
39+
val dot = vizListener.getDotFile(stageId)
40+
if (dot.isEmpty) {
41+
println("No viz for stage " + stageId)
42+
return Seq.empty
43+
}
44+
println("Rendering viz for stage " + stageId)
45+
val viz = <div id="stage-viz">{dot.get}</div>
46+
val script = {
47+
<script type="text/javascript">
48+
<xml:unparsed>
49+
var dot = document.getElementById("stage-viz").innerHTML;
50+
var dot = dot.replace(/&lt;/g, "<").replace(/&gt;/g, ">").replace(/&quot;/g, "\"");
51+
console.log(dot);
52+
var viz = Viz(dot, "svg", "dot");
53+
document.getElementById("stage-viz").innerHTML = viz;
54+
</xml:unparsed>
55+
</script>
56+
}
57+
viz ++ script
58+
}
3659

3760
def render(request: HttpServletRequest): Seq[Node] = {
38-
listener.synchronized {
61+
progressListener.synchronized {
3962
val parameterId = request.getParameter("id")
4063
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
4164

@@ -44,7 +67,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
4467

4568
val stageId = parameterId.toInt
4669
val stageAttemptId = parameterAttempt.toInt
47-
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
70+
val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId))
4871

4972
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
5073
val content =
@@ -56,11 +79,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
5679
s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent)
5780
}
5881

82+
val viz: Seq[Node] = renderViz(stageId)
83+
5984
val stageData = stageDataOption.get
6085
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
6186

6287
val numCompleted = tasks.count(_.taskInfo.finished)
63-
val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
88+
val accumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
6489
val hasAccumulators = accumulables.size > 0
6590

6691
val summary =
@@ -432,6 +457,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
432457
if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
433458

434459
val content =
460+
viz ++
435461
summary ++
436462
showAdditionalMetrics ++
437463
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
@@ -440,7 +466,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
440466
maybeAccumulableTable ++
441467
<h4>Tasks</h4> ++ taskTable
442468

443-
UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent)
469+
UIUtils.headerSparkPage(
470+
"Details for Stage %d".format(stageId), content, parent, showVisualization = true)
444471
}
445472
}
446473

core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,20 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
2626
private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
2727
val sc = parent.sc
2828
val killEnabled = parent.killEnabled
29-
val listener = parent.jobProgressListener
29+
val progressListener = parent.jobProgressListener
30+
val vizListener = parent.visualizationListener
3031

3132
attachPage(new AllStagesPage(this))
3233
attachPage(new StagePage(this))
3334
attachPage(new PoolPage(this))
3435

35-
def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
36+
def isFairScheduler: Boolean = progressListener.schedulingMode.exists(_ == SchedulingMode.FAIR)
3637

3738
def handleKillRequest(request: HttpServletRequest): Unit = {
3839
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
3940
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
4041
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
41-
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
42+
if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
4243
sc.get.cancelStage(stageId)
4344
}
4445
// Do a quick pause here to give Spark time to kill the stage so it shows up as

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import org.apache.spark.storage.RDDInfo
3030
private[spark] class VisualizationListener extends SparkListener {
3131
private val graphsByStageId = new mutable.HashMap[Int, VizGraph] // stage ID -> viz graph
3232

33+
def getDotFile(stageId: Int): Option[String] = {
34+
graphsByStageId.get(stageId).map(VisualizationListener.makeDotFile)
35+
}
36+
3337
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
3438
val stageId = stageSubmitted.stageInfo.stageId
3539
val rddInfos = stageSubmitted.stageInfo.rddInfos
@@ -132,7 +136,7 @@ private object VisualizationListener {
132136
*/
133137
private def makeDotSubgraph(scope: VizScope, indent: String): String = {
134138
val subgraph = new StringBuilder
135-
subgraph.append(indent + "subgraph cluster" + scope.id + "{\n")
139+
subgraph.append(indent + "subgraph cluster" + scope.id + " {\n")
136140
subgraph.append(indent + " label = \"" + scope.name + "\"\n")
137141
scope.childrenNodes.foreach { node =>
138142
subgraph.append(indent + " " + makeDotNode(node) + "\n")

0 commit comments

Comments
 (0)