Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 17 additions & 265 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,272 +18,17 @@
package org.apache.spark.ui.jobs

import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.HashMap
import scala.xml._

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler._
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
private val JOBS_LEGEND =
<div class="legend-area"><svg width="150px" height="85px">
<rect class="succeeded-job-legend"
x="5px" y="5px" width="20px" height="15px" rx="2px" ry="2px"></rect>
<text x="35px" y="17px">Succeeded</text>
<rect class="failed-job-legend"
x="5px" y="30px" width="20px" height="15px" rx="2px" ry="2px"></rect>
<text x="35px" y="42px">Failed</text>
<rect class="running-job-legend"
x="5px" y="55px" width="20px" height="15px" rx="2px" ry="2px"></rect>
<text x="35px" y="67px">Running</text>
</svg></div>.toString.filter(_ != '\n')

private val EXECUTORS_LEGEND =
<div class="legend-area"><svg width="150px" height="55px">
<rect class="executor-added-legend"
x="5px" y="5px" width="20px" height="15px" rx="2px" ry="2px"></rect>
<text x="35px" y="17px">Added</text>
<rect class="executor-removed-legend"
x="5px" y="30px" width="20px" height="15px" rx="2px" ry="2px"></rect>
<text x="35px" y="42px">Removed</text>
</svg></div>.toString.filter(_ != '\n')

private def getLastStageNameAndDescription(job: JobUIData): (String, String) = {
val lastStageInfo = Option(job.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)}
val lastStageData = lastStageInfo.flatMap { s =>
parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId))
}
val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val description = lastStageData.flatMap(_.description).getOrElse("")
(name, description)
}

private def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = {
jobUIDatas.filter { jobUIData =>
jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined
}.map { jobUIData =>
val jobId = jobUIData.jobId
val status = jobUIData.status
val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData)
val displayJobDescription =
if (jobDescription.isEmpty) {
jobName
} else {
UIUtils.makeDescription(jobDescription, "", plainText = true).text
}
val submissionTime = jobUIData.submissionTime.get
val completionTimeOpt = jobUIData.completionTime
val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis())
val classNameByStatus = status match {
case JobExecutionStatus.SUCCEEDED => "succeeded"
case JobExecutionStatus.FAILED => "failed"
case JobExecutionStatus.RUNNING => "running"
case JobExecutionStatus.UNKNOWN => "unknown"
}

// The timeline library treats contents as HTML, so we have to escape them. We need to add
// extra layers of escaping in order to embed this in a Javascript string literal.
val escapedDesc = Utility.escape(displayJobDescription)
val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
val jobEventJsonAsStr =
s"""
|{
| 'className': 'job application-timeline-object ${classNameByStatus}',
| 'group': 'jobs',
| 'start': new Date(${submissionTime}),
| 'end': new Date(${completionTime}),
| 'content': '<div class="application-timeline-content"' +
| 'data-html="true" data-placement="top" data-toggle="tooltip"' +
| 'data-title="${jsEscapedDesc} (Job ${jobId})<br>' +
| 'Status: ${status}<br>' +
| 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' +
| '${
if (status != JobExecutionStatus.RUNNING) {
s"""<br>Completed: ${UIUtils.formatDate(new Date(completionTime))}"""
} else {
""
}
}">' +
| '${jsEscapedDesc} (Job ${jobId})</div>'
|}
""".stripMargin
jobEventJsonAsStr
}
}

private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]):
Seq[String] = {
val events = ListBuffer[String]()
executorUIDatas.foreach {
case a: SparkListenerExecutorAdded =>
val addedEvent =
s"""
|{
| 'className': 'executor added',
| 'group': 'executors',
| 'start': new Date(${a.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
| 'data-title="Executor ${a.executorId}<br>' +
| 'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
| 'data-html="true">Executor ${a.executorId} added</div>'
|}
""".stripMargin
events += addedEvent
case e: SparkListenerExecutorRemoved =>
val removedEvent =
s"""
|{
| 'className': 'executor removed',
| 'group': 'executors',
| 'start': new Date(${e.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
| 'data-title="Executor ${e.executorId}<br>' +
| 'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
| '${
if (e.reason != null) {
s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
} else {
""
}
}"' +
| 'data-html="true">Executor ${e.executorId} removed</div>'
|}
""".stripMargin
events += removedEvent

}
events.toSeq
}

private def makeTimeline(
jobs: Seq[JobUIData],
executors: Seq[SparkListenerEvent],
startTime: Long): Seq[Node] = {

val jobEventJsonAsStrSeq = makeJobEvent(jobs)
val executorEventJsonAsStrSeq = makeExecutorEvent(executors)

val groupJsonArrayAsStr =
s"""
|[
| {
| 'id': 'executors',
| 'content': '<div>Executors</div>${EXECUTORS_LEGEND}',
| },
| {
| 'id': 'jobs',
| 'content': '<div>Jobs</div>${JOBS_LEGEND}',
| }
|]
""".stripMargin

val eventArrayAsStr =
(jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]")

<span class="expand-application-timeline">
<span class="expand-application-timeline-arrow arrow-closed"></span>
<a data-toggle="tooltip" title={ToolTips.JOB_TIMELINE} data-placement="right">
Event Timeline
</a>
</span> ++
<div id="application-timeline" class="collapsed">
<div class="control-panel">
<div id="application-timeline-zoom-lock">
<input type="checkbox"></input>
<span>Enable zooming</span>
</div>
</div>
</div> ++
<script type="text/javascript">
{Unparsed(s"drawApplicationTimeline(${groupJsonArrayAsStr}," +
s"${eventArrayAsStr}, ${startTime}, ${UIUtils.getTimeZoneOffset()});")}
</script>
}

private def jobsTable(
request: HttpServletRequest,
tableHeaderId: String,
jobTag: String,
jobs: Seq[JobUIData],
killEnabled: Boolean): Seq[Node] = {
// stripXSS is called to remove suspicious characters used in XSS attacks
val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))

val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"

// stripXSS is called first to remove suspicious characters used in XSS attacks
val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page"))
val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort"))
val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc"))
val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize"))
val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize"))

val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
UIUtils.decodeURLParameter(sortColumn)
}.getOrElse(jobIdTitle)
val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
// New jobs should be shown above old jobs by default.
if (jobSortColumn == jobIdTitle) true else false
)
val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)

val page: Int = {
// If the user has changed to a larger page size, then go to page 1 in order to avoid
// IndexOutOfBoundsException.
if (jobPageSize <= jobPrevPageSize) {
jobPage
} else {
1
}
}
val currentTime = System.currentTimeMillis()

try {
new JobPagedTable(
jobs,
tableHeaderId,
jobTag,
UIUtils.prependBaseUri(parent.basePath),
"jobs", // subPath
parameterOtherTable,
parent.jobProgresslistener.stageIdToInfo,
parent.jobProgresslistener.stageIdToData,
killEnabled,
currentTime,
jobIdTitle,
pageSize = jobPageSize,
sortColumn = jobSortColumn,
desc = jobSortDesc
).table(page)
} catch {
case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
<div class="alert alert-error">
<p>Error while rendering job table:</p>
<pre>
{Utils.exceptionString(e)}
</pre>
</div>
}
}

def render(request: HttpServletRequest): Seq[Node] = {
val listener = parent.jobProgresslistener
Expand All @@ -295,11 +40,14 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val failedJobs = listener.failedJobs.reverse

val activeJobsTable =
jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled)
new JobsTable(request, activeJobs, "active", "activeJob", parent.basePath,
parent.jobProgresslistener, killEnabled = parent.killEnabled)
val completedJobsTable =
jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false)
new JobsTable(request, completedJobs, "completed", "completedJob", parent.basePath,
parent.jobProgresslistener, killEnabled = false)
val failedJobsTable =
jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false)
new JobsTable(request, failedJobs, "failed", "failedJob", parent.basePath,
parent.jobProgresslistener, killEnabled = false)

val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
Expand Down Expand Up @@ -361,20 +109,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {

var content = summary
val executorListener = parent.executorListener
content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
executorListener.executorEvents, startTime)
content ++= JobsUtils.makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
executorListener.executorEvents, startTime, parent.jobProgresslistener)

if (shouldShowActiveJobs) {
content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
activeJobsTable
activeJobsTable.toNodeSeq
}
if (shouldShowCompletedJobs) {
content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++
completedJobsTable
completedJobsTable.toNodeSeq
}
if (shouldShowFailedJobs) {
content ++= <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++
failedJobsTable
failedJobsTable.toNodeSeq
}

val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" +
Expand Down Expand Up @@ -615,7 +363,11 @@ private[ui] class JobPagedTable(

<tr id={"job-" + job.jobId}>
<td>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
{job.jobId} { job.jobGroup.map { id =>
<a href={"%s/jobs/jobgroup?id=%s".format(basePath, id)} class="name-link">
{id}
</a>
}.getOrElse({job.jobGroup.map(id => s"($id)").getOrElse("")})}
</td>
<td>
{jobTableRow.jobDescription} {killLink}
Expand Down
Loading