From c3b3d8091bbad88cd71be101f46948e2499d8051 Mon Sep 17 00:00:00 2001 From: Yewei Zhang Date: Wed, 7 Feb 2018 11:14:31 -0800 Subject: [PATCH 1/3] Add JobGroup page and move the logic out from AllJobsPage so that it can be shared. --- .../apache/spark/ui/jobs/AllJobsPage.scala | 282 ++---------------- .../apache/spark/ui/jobs/JobGroupPage.scala | 153 ++++++++++ .../org/apache/spark/ui/jobs/JobsTab.scala | 1 + .../org/apache/spark/ui/jobs/JobsTable.scala | 99 ++++++ .../org/apache/spark/ui/jobs/JobsUtils.scala | 215 +++++++++++++ .../org/apache/spark/ui/UISeleniumSuite.scala | 40 +++ 6 files changed, 525 insertions(+), 265 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index a0fd29c22ddca..ef5fd4c33dcc0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -18,272 +18,17 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder -import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.HashMap import scala.xml._ -import org.apache.commons.lang3.StringEscapeUtils - -import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler._ import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData} -import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished jobs */ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { - private val JOBS_LEGEND = -
- - Succeeded - - Failed - - Running -
.toString.filter(_ != '\n') - - private val EXECUTORS_LEGEND = -
- - Added - - Removed -
.toString.filter(_ != '\n') - - private def getLastStageNameAndDescription(job: JobUIData): (String, String) = { - val lastStageInfo = Option(job.stageIds) - .filter(_.nonEmpty) - .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)} - val lastStageData = lastStageInfo.flatMap { s => - parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId)) - } - val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val description = lastStageData.flatMap(_.description).getOrElse("") - (name, description) - } - - private def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = { - jobUIDatas.filter { jobUIData => - jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined - }.map { jobUIData => - val jobId = jobUIData.jobId - val status = jobUIData.status - val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData) - val displayJobDescription = - if (jobDescription.isEmpty) { - jobName - } else { - UIUtils.makeDescription(jobDescription, "", plainText = true).text - } - val submissionTime = jobUIData.submissionTime.get - val completionTimeOpt = jobUIData.completionTime - val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis()) - val classNameByStatus = status match { - case JobExecutionStatus.SUCCEEDED => "succeeded" - case JobExecutionStatus.FAILED => "failed" - case JobExecutionStatus.RUNNING => "running" - case JobExecutionStatus.UNKNOWN => "unknown" - } - - // The timeline library treats contents as HTML, so we have to escape them. We need to add - // extra layers of escaping in order to embed this in a Javascript string literal. - val escapedDesc = Utility.escape(displayJobDescription) - val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) - val jobEventJsonAsStr = - s""" - |{ - | 'className': 'job application-timeline-object ${classNameByStatus}', - | 'group': 'jobs', - | 'start': new Date(${submissionTime}), - | 'end': new Date(${completionTime}), - | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" - } else { - "" - } - }">' + - | '${jsEscapedDesc} (Job ${jobId})
' - |} - """.stripMargin - jobEventJsonAsStr - } - } - - private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): - Seq[String] = { - val events = ListBuffer[String]() - executorUIDatas.foreach { - case a: SparkListenerExecutorAdded => - val addedEvent = - s""" - |{ - | 'className': 'executor added', - | 'group': 'executors', - | 'start': new Date(${a.time}), - | 'content': '
Executor ${a.executorId} added
' - |} - """.stripMargin - events += addedEvent - case e: SparkListenerExecutorRemoved => - val removedEvent = - s""" - |{ - | 'className': 'executor removed', - | 'group': 'executors', - | 'start': new Date(${e.time}), - | 'content': '
Reason: ${e.reason.replace("\n", " ")}""" - } else { - "" - } - }"' + - | 'data-html="true">Executor ${e.executorId} removed
' - |} - """.stripMargin - events += removedEvent - - } - events.toSeq - } - - private def makeTimeline( - jobs: Seq[JobUIData], - executors: Seq[SparkListenerEvent], - startTime: Long): Seq[Node] = { - - val jobEventJsonAsStrSeq = makeJobEvent(jobs) - val executorEventJsonAsStrSeq = makeExecutorEvent(executors) - - val groupJsonArrayAsStr = - s""" - |[ - | { - | 'id': 'executors', - | 'content': '
Executors
${EXECUTORS_LEGEND}', - | }, - | { - | 'id': 'jobs', - | 'content': '
Jobs
${JOBS_LEGEND}', - | } - |] - """.stripMargin - - val eventArrayAsStr = - (jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]") - - - - - Event Timeline - - ++ - ++ - - } - - private def jobsTable( - request: HttpServletRequest, - tableHeaderId: String, - jobTag: String, - jobs: Seq[JobUIData], - killEnabled: Boolean): Seq[Node] = { - // stripXSS is called to remove suspicious characters used in XSS attacks - val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS)) - val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) - .map(para => para._1 + "=" + para._2(0)) - - val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) - val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id" - - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page")) - val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort")) - val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc")) - val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize")) - val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize")) - - val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1) - val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn => - UIUtils.decodeURLParameter(sortColumn) - }.getOrElse(jobIdTitle) - val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( - // New jobs should be shown above old jobs by default. - if (jobSortColumn == jobIdTitle) true else false - ) - val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) - val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) - - val page: Int = { - // If the user has changed to a larger page size, then go to page 1 in order to avoid - // IndexOutOfBoundsException. - if (jobPageSize <= jobPrevPageSize) { - jobPage - } else { - 1 - } - } - val currentTime = System.currentTimeMillis() - - try { - new JobPagedTable( - jobs, - tableHeaderId, - jobTag, - UIUtils.prependBaseUri(parent.basePath), - "jobs", // subPath - parameterOtherTable, - parent.jobProgresslistener.stageIdToInfo, - parent.jobProgresslistener.stageIdToData, - killEnabled, - currentTime, - jobIdTitle, - pageSize = jobPageSize, - sortColumn = jobSortColumn, - desc = jobSortDesc - ).table(page) - } catch { - case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => -
-

Error while rendering job table:

-
-            {Utils.exceptionString(e)}
-          
-
- } - } def render(request: HttpServletRequest): Seq[Node] = { val listener = parent.jobProgresslistener @@ -295,11 +40,14 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val failedJobs = listener.failedJobs.reverse val activeJobsTable = - jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) + new JobsTable(request, activeJobs, "active", "activeJob", parent.basePath, + parent.jobProgresslistener, killEnabled = parent.killEnabled) val completedJobsTable = - jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) + new JobsTable(request, completedJobs, "completed", "completedJob", parent.basePath, + parent.jobProgresslistener, killEnabled = false) val failedJobsTable = - jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) + new JobsTable(request, failedJobs, "failed", "failedJob", parent.basePath, + parent.jobProgresslistener, killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty @@ -361,20 +109,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { var content = summary val executorListener = parent.executorListener - content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, - executorListener.executorEvents, startTime) + content ++= JobsUtils.makeTimeline(activeJobs ++ completedJobs ++ failedJobs, + executorListener.executorEvents, startTime, parent.jobProgresslistener) if (shouldShowActiveJobs) { content ++=

Active Jobs ({activeJobs.size})

++ - activeJobsTable + activeJobsTable.toNodeSeq } if (shouldShowCompletedJobs) { content ++=

Completed Jobs ({completedJobNumStr})

++ - completedJobsTable + completedJobsTable.toNodeSeq } if (shouldShowFailedJobs) { content ++=

Failed Jobs ({failedJobs.size})

++ - failedJobsTable + failedJobsTable.toNodeSeq } val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" + @@ -615,7 +363,11 @@ private[ui] class JobPagedTable( - {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} + {job.jobId} { job.jobGroup.map { id => + + {id} + + }.getOrElse({job.jobGroup.map(id => s"($id)").getOrElse("")})} {jobTableRow.jobDescription} {killLink} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala new file mode 100644 index 0000000000000..b14d8aaed4ad3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable +import scala.xml.{Node, NodeSeq} + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.jobs.UIData.JobUIData + +/** Page showing list of jobs under a job group id */ +private[ui] class JobGroupPage(parent: JobsTab) extends WebUIPage("jobgroup") { + + def render(request: HttpServletRequest): Seq[Node] = { + val listener = parent.jobProgresslistener + listener.synchronized { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing parameter id") + + val jobGroupId = parameterId + val groupToJobsTable = listener.jobGroupToJobIds.get(jobGroupId) + if (groupToJobsTable.isEmpty) { + val content = +
+

No information to display for jobGroup + {jobGroupId} +

+
+ return UIUtils.headerSparkPage( + s"Details for JobGroup $jobGroupId", content, parent) + } + + val jobsInGroup = listener.jobIdToData + + val activeJobsInGroup = mutable.Buffer[JobUIData]() + val completedJobsInGroup = mutable.Buffer[JobUIData]() + val failedJobsInGroup = mutable.Buffer[JobUIData]() + var totalDuration = 0L + groupToJobsTable.get.foreach { jobId => + val job = jobsInGroup.get(jobId) + val duration: Option[Long] = { + job.get.submissionTime.map { start => + val end = job.get.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + } + totalDuration += duration.getOrElse(0L) + job.get.status match { + case JobExecutionStatus.RUNNING => activeJobsInGroup ++= job + case JobExecutionStatus.SUCCEEDED => completedJobsInGroup ++= job + case JobExecutionStatus.FAILED => failedJobsInGroup ++= job + case JobExecutionStatus.UNKNOWN => // not handling unknown status + } + } + + val activeJobsTable = + new JobsTable(request, + activeJobsInGroup.sortBy(_.submissionTime.getOrElse((-1L))).reverse, + "active", "activeJob", parent.basePath, listener, killEnabled = parent.killEnabled) + val completedJobsTable = + new JobsTable(request, + completedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse, + "completed", "completeJob", parent.basePath, listener, killEnabled = false) + val failedJobsTable = + new JobsTable(request, + failedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse, + "failed", "failedJob", parent.basePath, listener, killEnabled = false) + + val shouldShowActiveJobs = activeJobsInGroup.nonEmpty + val shouldShowCompletedJobs = completedJobsInGroup.nonEmpty + val shouldShowFailedJobs = failedJobsInGroup.nonEmpty + + val summary: NodeSeq = +
+ +
+ + var content = summary + val executorListener = parent.executorListener + content ++= JobsUtils.makeTimeline( + activeJobsInGroup ++ completedJobsInGroup ++ failedJobsInGroup, + executorListener.executorEvents, listener.startTime, listener) + + if (shouldShowActiveJobs) { + content ++=

Active Jobs ({activeJobsInGroup.size})

++ + activeJobsTable.toNodeSeq + } + if (shouldShowCompletedJobs) { + content ++=

Completed Jobs ({completedJobsInGroup.size})

++ + completedJobsTable.toNodeSeq + } + if (shouldShowFailedJobs) { + content ++=

Failed Jobs ({failedJobsInGroup.size})

++ + failedJobsTable.toNodeSeq + } + + val helpText = + s"""A job is triggered by an action, like count() or saveAsTextFile(). + | Click on a job to see information about the stages of tasks inside it.""".stripMargin + + UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index cc173381879a6..141067e93db30 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -37,6 +37,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + attachPage(new JobGroupPage(this)) def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala new file mode 100644 index 0000000000000..2da4baf5efc41 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.collection.JavaConverters._ + +import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.jobs.UIData.JobUIData +import org.apache.spark.util.Utils + +private[ui] class JobsTable( + request: HttpServletRequest, + jobs: Seq[JobUIData], + tableHeaderId: String, + jobTag: String, + basePath: String, + progressListener: JobProgressListener, + killEnabled: Boolean) { + // stripXSS is called to remove suspicious characters used in XSS attacks + val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS)) + val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) + .map(para => para._1 + "=" + para._2(0)) + + val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) + val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id" + + // stripXSS is called first to remove suspicious characters used in XSS attacks + val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page")) + val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort")) + val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc")) + val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize")) + val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize")) + + val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1) + val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse(jobIdTitle) + val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( + // New jobs should be shown above old jobs by default. + if (jobSortColumn == jobIdTitle) true else false + ) + val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) + val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) + + val page: Int = { + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + if (jobPageSize <= jobPrevPageSize) { + jobPage + } else { + 1 + } + } + val currentTime = System.currentTimeMillis() + + val toNodeSeq = try { + new JobPagedTable( + jobs, + tableHeaderId, + jobTag, + UIUtils.prependBaseUri(basePath), + "jobs", // subPath + parameterOtherTable, + progressListener.stageIdToInfo, + progressListener.stageIdToData, + killEnabled, + currentTime, + jobIdTitle, + pageSize = jobPageSize, + sortColumn = jobSortColumn, + desc = jobSortDesc + ).table(page) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering job table:

+
+          {Utils.exceptionString(e)}
+        
+
+ } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala new file mode 100644 index 0000000000000..db742fce07f1d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ui.jobs + +import java.util.Date + +import scala.collection.mutable.ListBuffer +import scala.xml.{Node, Unparsed, Utility} + +import org.apache.commons.lang3.StringEscapeUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.JobExecutionStatus +import org.apache.spark.scheduler.{SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerExecutorRemoved} +import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui.jobs.UIData.JobUIData + +/** Utility functions for generating Jobs related XML pages with spark content. */ +private object JobsUtils extends Logging { + val JOBS_LEGEND = +
+ + Succeeded + + Failed + + Running +
.toString.filter(_ != '\n') + + val EXECUTORS_LEGEND = +
+ + Added + + Removed +
.toString.filter(_ != '\n') + + def getLastStageNameAndDescription( + job: JobUIData, progressListener: JobProgressListener): (String, String) = { + val lastStageInfo = Option(job.stageIds) + .filter(_.nonEmpty) + .flatMap { ids => progressListener.stageIdToInfo.get(ids.max)} + val lastStageData = lastStageInfo.flatMap { s => + progressListener.stageIdToData.get((s.stageId, s.attemptId)) + } + val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + val description = lastStageData.flatMap(_.description).getOrElse("") + (name, description) + } + + def makeJobEvent( + jobUIDatas: Seq[JobUIData], progressListener: JobProgressListener): Seq[String] = { + jobUIDatas.filter { jobUIData => + jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined + }.map { jobUIData => + val jobId = jobUIData.jobId + val status = jobUIData.status + val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData, progressListener) + val displayJobDescription = + if (jobDescription.isEmpty) { + jobName + } else { + UIUtils.makeDescription(jobDescription, "", plainText = true).text + } + val submissionTime = jobUIData.submissionTime.get + val completionTimeOpt = jobUIData.completionTime + val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis()) + val classNameByStatus = status match { + case JobExecutionStatus.SUCCEEDED => "succeeded" + case JobExecutionStatus.FAILED => "failed" + case JobExecutionStatus.RUNNING => "running" + case JobExecutionStatus.UNKNOWN => "unknown" + } + + // The timeline library treats contents as HTML, so we have to escape them. We need to add + // extra layers of escaping in order to embed this in a Javascript string literal. + val escapedDesc = Utility.escape(displayJobDescription) + val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) + val jobEventJsonAsStr = + s""" + |{ + | 'className': 'job application-timeline-object ${classNameByStatus}', + | 'group': 'jobs', + | 'start': new Date(${submissionTime}), + | 'end': new Date(${completionTime}), + | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" + } else { + "" + } + }">' + + | '${jsEscapedDesc} (Job ${jobId})
' + |} + """.stripMargin + jobEventJsonAsStr + } + } + + def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): + Seq[String] = { + val events = ListBuffer[String]() + executorUIDatas.foreach { + case a: SparkListenerExecutorAdded => + val addedEvent = + s""" + |{ + | 'className': 'executor added', + | 'group': 'executors', + | 'start': new Date(${a.time}), + | 'content': '
Executor ${a.executorId} added
' + |} + """.stripMargin + events += addedEvent + case e: SparkListenerExecutorRemoved => + val removedEvent = + s""" + |{ + | 'className': 'executor removed', + | 'group': 'executors', + | 'start': new Date(${e.time}), + | 'content': '
Reason: ${e.reason.replace("\n", " ")}""" + } else { + "" + } + }"' + + | 'data-html="true">Executor ${e.executorId} removed
' + |} + """.stripMargin + events += removedEvent + + } + events.toSeq + } + + def makeTimeline( + jobs: Seq[JobUIData], + executors: Seq[SparkListenerEvent], + startTime: Long, + progressListener: JobProgressListener): Seq[Node] = { + + val jobEventJsonAsStrSeq = makeJobEvent(jobs, progressListener) + val executorEventJsonAsStrSeq = makeExecutorEvent(executors) + + val groupJsonArrayAsStr = + s""" + |[ + | { + | 'id': 'executors', + | 'content': '
Executors
${EXECUTORS_LEGEND}', + | }, + | { + | 'id': 'jobs', + | 'content': '
Jobs
${JOBS_LEGEND}', + | } + |] + """.stripMargin + + val eventArrayAsStr = + (jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]") + + + + + Event Timeline + + ++ + ++ + + } +} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index bdd148875e38a..d28c157c2d723 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -706,6 +706,46 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("jobs with job group id should show correct link to job group page on all jobs page") { + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + // Once at least one job has been run in a job group, then we should display the link to + // job group page + val jobGroupId = "my-job-group" + sc.setJobGroup(jobGroupId, "my-job-group-description") + // Create an RDD that involves multiple stages: + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, "/jobs") + findAll(cssSelector("tbody tr")).foreach { row => + val links = row.underlying.findElements(By.xpath(".//a")) + links.size should be (2) + links.get(0).getText().toLowerCase should include (jobGroupId) + links.get(0).getAttribute("href") should include regex ( + s"(?=.*jobgroup)(?=.*${jobGroupId})") + links.get(1).getText().toLowerCase should include ("count") + } + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}") + println("here" + webDriver.getPageSource) + find(id("pending")) should be (None) + find(id("active")) should be (None) + find(id("failed")) should be (None) + find(id("completed")).get.text should be ("Completed Jobs (1)") + findAll(cssSelector("tbody tr a")).foreach { link => + println(link.text.toLowerCase) + link.text.toLowerCase should include ("count") + } + } + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } From a4a6dfb35cce37e22f060c681714e0828e491ef0 Mon Sep 17 00:00:00 2001 From: Yewei Zhang Date: Wed, 7 Feb 2018 11:55:19 -0800 Subject: [PATCH 2/3] The job group link will still show up as we shared the same code with the AllJobs. --- .../test/scala/org/apache/spark/ui/UISeleniumSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index d28c157c2d723..b3d656c41011a 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -738,9 +738,11 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B find(id("active")) should be (None) find(id("failed")) should be (None) find(id("completed")).get.text should be ("Completed Jobs (1)") - findAll(cssSelector("tbody tr a")).foreach { link => - println(link.text.toLowerCase) - link.text.toLowerCase should include ("count") + findAll(cssSelector("tbody tr")).foreach { row => + val links = row.underlying.findElements(By.xpath(".//a")) + links.size should be (2) + links.get(0).getText().toLowerCase should include (jobGroupId) + links.get(1).getText().toLowerCase should include ("count") } } } From cb47eddc0966a5cbbc25b6cae42a900ad455e804 Mon Sep 17 00:00:00 2001 From: Yewei Zhang Date: Wed, 7 Feb 2018 12:08:35 -0800 Subject: [PATCH 3/3] remove my debug statement to see the page resource of jobgroup page --- core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index b3d656c41011a..18ee6a8ef4824 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -733,7 +733,6 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B eventually(timeout(10 seconds), interval(50 milliseconds)) { goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}") - println("here" + webDriver.getPageSource) find(id("pending")) should be (None) find(id("active")) should be (None) find(id("failed")) should be (None)