Skip to content

Commit 40eb8b6

Browse files
JoshRosenrxin
authored andcommitted
[SPARK-2321] Several progress API improvements / refactorings
This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen <[email protected]> Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait.
1 parent cbddac2 commit 40eb8b6

File tree

7 files changed

+269
-172
lines changed

7 files changed

+269
-172
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
2525
import java.util.concurrent.atomic.AtomicInteger
2626
import java.util.UUID.randomUUID
2727
import scala.collection.{Map, Set}
28+
import scala.collection.JavaConversions._
2829
import scala.collection.generic.Growable
2930
import scala.collection.mutable.HashMap
3031
import scala.reflect.{ClassTag, classTag}
@@ -61,7 +62,7 @@ import org.apache.spark.util._
6162
* this config overrides the default configs as well as system properties.
6263
*/
6364

64-
class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
65+
class SparkContext(config: SparkConf) extends Logging {
6566

6667
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
6768
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
228229
private[spark] val jobProgressListener = new JobProgressListener(conf)
229230
listenerBus.addListener(jobProgressListener)
230231

232+
val statusTracker = new SparkStatusTracker(this)
233+
231234
// Initialize the Spark UI
232235
private[spark] val ui: Option[SparkUI] =
233236
if (conf.getBoolean("spark.ui.enabled", true)) {
@@ -1001,6 +1004,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
10011004
/** The version of Spark on which this application is running. */
10021005
def version = SPARK_VERSION
10031006

1007+
/**
1008+
* Return a map from the slave to the max memory available for caching and the remaining
1009+
* memory available for caching.
1010+
*/
1011+
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
1012+
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
1013+
(blockManagerId.host + ":" + blockManagerId.port, mem)
1014+
}
1015+
}
1016+
1017+
/**
1018+
* :: DeveloperApi ::
1019+
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
1020+
* they take, etc.
1021+
*/
1022+
@DeveloperApi
1023+
def getRDDStorageInfo: Array[RDDInfo] = {
1024+
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
1025+
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
1026+
rddInfos.filter(_.isCached)
1027+
}
1028+
1029+
/**
1030+
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
1031+
* Note that this does not necessarily mean the caching or computation was successful.
1032+
*/
1033+
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
1034+
1035+
/**
1036+
* :: DeveloperApi ::
1037+
* Return information about blocks stored in all of the slaves
1038+
*/
1039+
@DeveloperApi
1040+
def getExecutorStorageStatus: Array[StorageStatus] = {
1041+
env.blockManager.master.getStorageStatus
1042+
}
1043+
1044+
/**
1045+
* :: DeveloperApi ::
1046+
* Return pools for fair scheduler
1047+
*/
1048+
@DeveloperApi
1049+
def getAllPools: Seq[Schedulable] = {
1050+
// TODO(xiajunluan): We should take nested pools into account
1051+
taskScheduler.rootPool.schedulableQueue.toSeq
1052+
}
1053+
1054+
/**
1055+
* :: DeveloperApi ::
1056+
* Return the pool associated with the given name, if one exists
1057+
*/
1058+
@DeveloperApi
1059+
def getPoolForName(pool: String): Option[Schedulable] = {
1060+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
1061+
}
1062+
1063+
/**
1064+
* Return current scheduling mode
1065+
*/
1066+
def getSchedulingMode: SchedulingMode.SchedulingMode = {
1067+
taskScheduler.schedulingMode
1068+
}
1069+
10041070
/**
10051071
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
10061072
* any new nodes.

core/src/main/scala/org/apache/spark/SparkStatusAPI.scala

Lines changed: 0 additions & 142 deletions
This file was deleted.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
/**
21+
* Low-level status reporting APIs for monitoring job and stage progress.
22+
*
23+
* These APIs intentionally provide very weak consistency semantics; consumers of these APIs should
24+
* be prepared to handle empty / missing information. For example, a job's stage ids may be known
25+
* but the status API may not have any information about the details of those stages, so
26+
* `getStageInfo` could potentially return `None` for a valid stage id.
27+
*
28+
* To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs
29+
* will provide information for the last `spark.ui.retainedStages` stages and
30+
* `spark.ui.retainedJobs` jobs.
31+
*
32+
* NOTE: this class's constructor should be considered private and may be subject to change.
33+
*/
34+
class SparkStatusTracker private[spark] (sc: SparkContext) {
35+
36+
private val jobProgressListener = sc.jobProgressListener
37+
38+
/**
39+
* Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then
40+
* returns all known jobs that are not associated with a job group.
41+
*
42+
* The returned list may contain running, failed, and completed jobs, and may vary across
43+
* invocations of this method. This method does not guarantee the order of the elements in
44+
* its result.
45+
*/
46+
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
47+
jobProgressListener.synchronized {
48+
val jobData = jobProgressListener.jobIdToData.valuesIterator
49+
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
50+
}
51+
}
52+
53+
/**
54+
* Returns an array containing the ids of all active stages.
55+
*
56+
* This method does not guarantee the order of the elements in its result.
57+
*/
58+
def getActiveStageIds(): Array[Int] = {
59+
jobProgressListener.synchronized {
60+
jobProgressListener.activeStages.values.map(_.stageId).toArray
61+
}
62+
}
63+
64+
/**
65+
* Returns an array containing the ids of all active jobs.
66+
*
67+
* This method does not guarantee the order of the elements in its result.
68+
*/
69+
def getActiveJobIds(): Array[Int] = {
70+
jobProgressListener.synchronized {
71+
jobProgressListener.activeJobs.values.map(_.jobId).toArray
72+
}
73+
}
74+
75+
/**
76+
* Returns job information, or `None` if the job info could not be found or was garbage collected.
77+
*/
78+
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
79+
jobProgressListener.synchronized {
80+
jobProgressListener.jobIdToData.get(jobId).map { data =>
81+
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
82+
}
83+
}
84+
}
85+
86+
/**
87+
* Returns stage information, or `None` if the stage info could not be found or was
88+
* garbage collected.
89+
*/
90+
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
91+
jobProgressListener.synchronized {
92+
for (
93+
info <- jobProgressListener.stageIdToInfo.get(stageId);
94+
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
95+
) yield {
96+
new SparkStageInfoImpl(
97+
stageId,
98+
info.attemptId,
99+
info.name,
100+
info.numTasks,
101+
data.numActiveTasks,
102+
data.numCompleteTasks,
103+
data.numFailedTasks)
104+
}
105+
}
106+
}
107+
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class JavaSparkContext(val sc: SparkContext)
105105

106106
private[spark] val env = sc.env
107107

108+
def statusTracker = new JavaSparkStatusTracker(sc)
109+
108110
def isLocal: java.lang.Boolean = sc.isLocal
109111

110112
def sparkUser: String = sc.sparkUser
@@ -134,25 +136,6 @@ class JavaSparkContext(val sc: SparkContext)
134136
/** Default min number of partitions for Hadoop RDDs when not given by user */
135137
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
136138

137-
138-
/**
139-
* Return a list of all known jobs in a particular job group. The returned list may contain
140-
* running, failed, and completed jobs, and may vary across invocations of this method. This
141-
* method does not guarantee the order of the elements in its result.
142-
*/
143-
def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)
144-
145-
/**
146-
* Returns job information, or `null` if the job info could not be found or was garbage collected.
147-
*/
148-
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull
149-
150-
/**
151-
* Returns stage information, or `null` if the stage info could not be found or was
152-
* garbage collected.
153-
*/
154-
def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull
155-
156139
/** Distribute a local Scala collection to form an RDD. */
157140
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
158141
implicit val ctag: ClassTag[T] = fakeClassTag

0 commit comments

Comments
 (0)