Skip to content

Commit 2e8a141

Browse files
zsxwingtdas
authored andcommitted
[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped by SparkListener
If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot: ![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png) Author: zsxwing <[email protected]> Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits: aca0ba6 [zsxwing] Fix the code style 718765e [zsxwing] Make generateNormalJobRow private 8073b03 [zsxwing] Merge branch 'master' into SPARK-7305 83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener (cherry picked from commit 22ab70e) Signed-off-by: Tathagata Das <[email protected]>
1 parent 99897fe commit 2e8a141

File tree

1 file changed

+106
-30
lines changed

1 file changed

+106
-30
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala

Lines changed: 106 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22-
import scala.xml.{NodeSeq, Node}
22+
import scala.xml.{NodeSeq, Node, Text}
2323

2424
import org.apache.commons.lang3.StringEscapeUtils
2525

@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
2828
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
2929
import org.apache.spark.ui.jobs.UIData.JobUIData
3030

31+
private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
3132

3233
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
3334
private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
4445
<th>Error</th>
4546
}
4647

48+
private def generateJobRow(
49+
outputOpId: OutputOpId,
50+
outputOpDescription: Seq[Node],
51+
formattedOutputOpDuration: String,
52+
numSparkJobRowsInOutputOp: Int,
53+
isFirstRow: Boolean,
54+
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
55+
if (sparkJob.jobUIData.isDefined) {
56+
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
57+
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
58+
} else {
59+
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
60+
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
61+
}
62+
}
63+
4764
/**
4865
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
4966
* one cell, we use "rowspan" for the first row of a output op.
5067
*/
51-
def generateJobRow(
68+
private def generateNormalJobRow(
5269
outputOpId: OutputOpId,
70+
outputOpDescription: Seq[Node],
5371
formattedOutputOpDuration: String,
5472
numSparkJobRowsInOutputOp: Int,
5573
isFirstRow: Boolean,
5674
sparkJob: JobUIData): Seq[Node] = {
57-
val lastStageInfo = Option(sparkJob.stageIds)
58-
.filter(_.nonEmpty)
59-
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
60-
val lastStageData = lastStageInfo.flatMap { s =>
61-
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
62-
}
63-
64-
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
65-
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
6675
val duration: Option[Long] = {
6776
sparkJob.submissionTime.map { start =>
6877
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
8392
if (isFirstRow) {
8493
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
8594
<td rowspan={numSparkJobRowsInOutputOp.toString}>
86-
<span class="description-input" title={lastStageDescription}>
87-
{lastStageDescription}
88-
</span>{lastStageName}
95+
{outputOpDescription}
8996
</td>
9097
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
9198
} else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
122129
</tr>
123130
}
124131

125-
private def generateOutputOpIdRow(
126-
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
127-
val sparkjobDurations = sparkJobs.map(sparkJob => {
128-
sparkJob.submissionTime.map { start =>
129-
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
130-
end - start
132+
/**
133+
* If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
134+
* with "-" cells.
135+
*/
136+
private def generateDroppedJobRow(
137+
outputOpId: OutputOpId,
138+
outputOpDescription: Seq[Node],
139+
formattedOutputOpDuration: String,
140+
numSparkJobRowsInOutputOp: Int,
141+
isFirstRow: Boolean,
142+
jobId: Int): Seq[Node] = {
143+
// In the first row, output op id and its information needs to be shown. In other rows, these
144+
// cells will be taken up due to "rowspan".
145+
// scalastyle:off
146+
val prefixCells =
147+
if (isFirstRow) {
148+
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
149+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
150+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
151+
} else {
152+
Nil
131153
}
132-
})
154+
// scalastyle:on
155+
156+
<tr>
157+
{prefixCells}
158+
<td sorttable_customkey={jobId.toString}>
159+
{jobId.toString}
160+
</td>
161+
<!-- Duration -->
162+
<td>-</td>
163+
<!-- Stages: Succeeded/Total -->
164+
<td>-</td>
165+
<!-- Tasks (for all stages): Succeeded/Total -->
166+
<td>-</td>
167+
<!-- Error -->
168+
<td>-</td>
169+
</tr>
170+
}
171+
172+
private def generateOutputOpIdRow(
173+
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
174+
// We don't count the durations of dropped jobs
175+
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
176+
map(sparkJob => {
177+
sparkJob.submissionTime.map { start =>
178+
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
179+
end - start
180+
}
181+
})
133182
val formattedOutputOpDuration =
134-
if (sparkjobDurations.exists(_ == None)) {
135-
// If any job does not finish, set "formattedOutputOpDuration" to "-"
183+
if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
184+
// If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
136185
"-"
137186
} else {
138-
SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
187+
SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
139188
}
140-
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
189+
190+
val description = generateOutputOpDescription(sparkJobs)
191+
192+
generateJobRow(
193+
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
141194
sparkJobs.tail.map { sparkJob =>
142-
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
195+
generateJobRow(
196+
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
143197
}.flatMap(x => x)
144198
}
145199

200+
private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
201+
val lastStageInfo =
202+
sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
203+
flatMap { sparkJob => // For the first job, get the latest Stage info
204+
if (sparkJob.stageIds.isEmpty) {
205+
None
206+
} else {
207+
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
208+
}
209+
}
210+
val lastStageData = lastStageInfo.flatMap { s =>
211+
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
212+
}
213+
214+
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
215+
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
216+
217+
<span class="description-input" title={lastStageDescription}>
218+
{lastStageDescription}
219+
</span> ++ Text(lastStageName)
220+
}
221+
146222
private def failureReasonCell(failureReason: String): Seq[Node] = {
147223
val isMultiline = failureReason.indexOf('\n') >= 0
148224
// Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
187263
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
188264
}
189265
sparkListener.synchronized {
190-
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
266+
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
191267
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
192-
// Filter out spark Job ids that don't exist in sparkListener
193-
(outputOpId, sparkJobIds.flatMap(getJobData))
268+
(outputOpId,
269+
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
194270
}
195271

196272
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
200276
<tbody>
201277
{
202278
outputOpIdWithJobs.map {
203-
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
279+
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
204280
}
205281
}
206282
</tbody>

0 commit comments

Comments
 (0)