Skip to content

Commit 97d411f

Browse files
committed
json endpoint for one job
1 parent 0c96147 commit 97d411f

File tree

5 files changed

+85
-8
lines changed

5 files changed

+85
-8
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19+
import java.util
1920
import java.util.Date
2021
import javax.ws.rs._
2122
import javax.ws.rs.core.MediaType
2223

2324
import org.apache.spark.JobExecutionStatus
25+
import org.apache.spark.ui.SparkUI
2426
import org.apache.spark.ui.jobs.JobProgressListener
2527
import org.apache.spark.ui.jobs.UIData.JobUIData
2628

@@ -33,14 +35,9 @@ class AllJobsResource(uiRoot: UIRoot) {
3335
@QueryParam("status") statuses: java.util.List[JobExecutionStatus]
3436
): Seq[JobData] = {
3537
uiRoot.withSparkUI(appId) { ui =>
36-
val statusToJobs = ui.jobProgressListener.synchronized {
37-
Seq(
38-
JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
39-
JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
40-
JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
41-
)
42-
}
43-
val adjStatuses: java.util.List[JobExecutionStatus] = {
38+
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
39+
AllJobsResource.getStatusToJobs(ui)
40+
val adjStatuses: util.List[JobExecutionStatus] = {
4441
if (statuses.isEmpty) {
4542
java.util.Arrays.asList(JobExecutionStatus.values(): _*)
4643
}
@@ -61,6 +58,19 @@ class AllJobsResource(uiRoot: UIRoot) {
6158
}
6259

6360
object AllJobsResource {
61+
62+
def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = {
63+
val statusToJobs = ui.jobProgressListener.synchronized {
64+
Seq(
65+
JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
66+
JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
67+
JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
68+
)
69+
}
70+
statusToJobs
71+
}
72+
73+
6474
def convertJobData(
6575
job: JobUIData,
6676
listener: JobProgressListener,

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ class JsonRootResource extends UIRootFromServletContext {
4646
new AllJobsResource(uiRoot)
4747
}
4848

49+
@Path("applications/{appId}/jobs/{jobId: \\d+}")
50+
def getJob(): OneJobResource = {
51+
new OneJobResource(uiRoot)
52+
}
53+
54+
4955
@Path("applications/{appId}/executors")
5056
def getExecutors(): ExecutorListResource = {
5157
new ExecutorListResource(uiRoot)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status.api.v1
18+
19+
import javax.ws.rs.{PathParam, GET, Produces}
20+
import javax.ws.rs.core.MediaType
21+
22+
import org.apache.spark.JobExecutionStatus
23+
import org.apache.spark.ui.jobs.UIData.JobUIData
24+
25+
@Produces(Array(MediaType.APPLICATION_JSON))
26+
class OneJobResource(uiRoot: UIRoot) {
27+
28+
@GET
29+
def jobsList(
30+
@PathParam("appId") appId: String,
31+
@PathParam("jobId") jobId: Int
32+
): JobData = {
33+
uiRoot.withSparkUI(appId) { ui =>
34+
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
35+
AllJobsResource.getStatusToJobs(ui)
36+
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
37+
jobOpt.map { job =>
38+
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
39+
}.getOrElse {
40+
throw new NotFoundException("unknown job: " + jobId)
41+
}
42+
}
43+
}
44+
45+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"jobId" : 0,
3+
"name" : "count at <console>:15",
4+
"stageIds" : [ 0 ],
5+
"status" : "SUCCEEDED",
6+
"numTasks" : 8,
7+
"numActiveTasks" : 0,
8+
"numCompletedTasks" : 8,
9+
"numSkippedTasks" : 8,
10+
"numFailedTasks" : 0,
11+
"numActiveStages" : 0,
12+
"numCompletedStages" : 1,
13+
"numSkippedStages" : 0,
14+
"numFailedStages" : 0
15+
}

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
7171
"maxDate2 app list json" -> "applications?maxDate=2015-02-03T10:42:40.000CST",
7272
"one app json" -> "applications/local-1422981780767",
7373
"job list json" -> "applications/local-1422981780767/jobs",
74+
"one job json" -> "applications/local-1422981780767/jobs/0",
7475
"succeeded job list json" -> "applications/local-1422981780767/jobs?status=succeeded",
7576
"succeeded&failed job list json" -> "applications/local-1422981780767/jobs?status=succeeded&status=failed",
7677
"executor list json" -> "applications/local-1422981780767/executors",

0 commit comments

Comments
 (0)