Skip to content

Commit c47e294

Browse files
committed
Remove StatusAPI mixin trait.
This makes binary compatibility easier to reason about and might avoid some pitfalls that I’ve run into while attempting to refactor other parts of SparkContext to use mixin traits (see #3071, for example). Requiring users to access status API methods through `sc.statusAPI.*` also avoids SparkContext bloat and buys us extra freedom for adding parallel higher / lower-level APIs.
1 parent c6f4e70 commit c47e294

File tree

5 files changed

+95
-102
lines changed

5 files changed

+95
-102
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
2525
import java.util.concurrent.atomic.AtomicInteger
2626
import java.util.UUID.randomUUID
2727
import scala.collection.{Map, Set}
28+
import scala.collection.JavaConversions._
2829
import scala.collection.generic.Growable
2930
import scala.collection.mutable.HashMap
3031
import scala.reflect.{ClassTag, classTag}
@@ -61,7 +62,7 @@ import org.apache.spark.util._
6162
* this config overrides the default configs as well as system properties.
6263
*/
6364

64-
class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
65+
class SparkContext(config: SparkConf) extends Logging {
6566

6667
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
6768
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
228229
private[spark] val jobProgressListener = new JobProgressListener(conf)
229230
listenerBus.addListener(jobProgressListener)
230231

232+
val statusAPI = SparkStatusAPI(this)
233+
231234
// Initialize the Spark UI
232235
private[spark] val ui: Option[SparkUI] =
233236
if (conf.getBoolean("spark.ui.enabled", true)) {
@@ -1001,6 +1004,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
10011004
/** The version of Spark on which this application is running. */
10021005
def version = SPARK_VERSION
10031006

1007+
/**
1008+
* Return a map from the slave to the max memory available for caching and the remaining
1009+
* memory available for caching.
1010+
*/
1011+
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
1012+
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
1013+
(blockManagerId.host + ":" + blockManagerId.port, mem)
1014+
}
1015+
}
1016+
1017+
/**
1018+
* :: DeveloperApi ::
1019+
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
1020+
* they take, etc.
1021+
*/
1022+
@DeveloperApi
1023+
def getRDDStorageInfo: Array[RDDInfo] = {
1024+
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
1025+
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
1026+
rddInfos.filter(_.isCached)
1027+
}
1028+
1029+
/**
1030+
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
1031+
* Note that this does not necessarily mean the caching or computation was successful.
1032+
*/
1033+
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
1034+
1035+
/**
1036+
* :: DeveloperApi ::
1037+
* Return information about blocks stored in all of the slaves
1038+
*/
1039+
@DeveloperApi
1040+
def getExecutorStorageStatus: Array[StorageStatus] = {
1041+
env.blockManager.master.getStorageStatus
1042+
}
1043+
1044+
/**
1045+
* :: DeveloperApi ::
1046+
* Return pools for fair scheduler
1047+
*/
1048+
@DeveloperApi
1049+
def getAllPools: Seq[Schedulable] = {
1050+
// TODO(xiajunluan): We should take nested pools into account
1051+
taskScheduler.rootPool.schedulableQueue.toSeq
1052+
}
1053+
1054+
/**
1055+
* :: DeveloperApi ::
1056+
* Return the pool associated with the given name, if one exists
1057+
*/
1058+
@DeveloperApi
1059+
def getPoolForName(pool: String): Option[Schedulable] = {
1060+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
1061+
}
1062+
1063+
/**
1064+
* Return current scheduling mode
1065+
*/
1066+
def getSchedulingMode: SchedulingMode.SchedulingMode = {
1067+
taskScheduler.schedulingMode
1068+
}
1069+
10041070
/**
10051071
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
10061072
* any new nodes.

core/src/main/scala/org/apache/spark/SparkStatusAPI.scala

Lines changed: 18 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,83 +17,21 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.Map
21-
import scala.collection.JavaConversions._
22-
23-
import org.apache.spark.annotation.DeveloperApi
24-
import org.apache.spark.rdd.RDD
25-
import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
26-
import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}
27-
2820
/**
29-
* Trait that implements Spark's status APIs. This trait is designed to be mixed into
30-
* SparkContext; it allows the status API code to live in its own file.
21+
* Low-level status reporting APIs for monitoring job and stage progress.
22+
*
23+
* These APIs intentionally provide very weak consistency semantics; consumers of these APIs should
24+
* be prepared to handle empty / missing information. For example, a job's stage ids may be known
25+
* but the status API may not have any information about the details of those stages, so
26+
* `getStageInfo` could potentially return `None` for a valid stage id.
27+
*
28+
* To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs
29+
* will provide information for the last `spark.ui.retainedStages` stages and
30+
* `spark.ui.retainedJobs` jobs.
3131
*/
32-
private[spark] trait SparkStatusAPI { this: SparkContext =>
33-
34-
/**
35-
* Return a map from the slave to the max memory available for caching and the remaining
36-
* memory available for caching.
37-
*/
38-
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
39-
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
40-
(blockManagerId.host + ":" + blockManagerId.port, mem)
41-
}
42-
}
43-
44-
/**
45-
* :: DeveloperApi ::
46-
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
47-
* they take, etc.
48-
*/
49-
@DeveloperApi
50-
def getRDDStorageInfo: Array[RDDInfo] = {
51-
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
52-
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
53-
rddInfos.filter(_.isCached)
54-
}
55-
56-
/**
57-
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
58-
* Note that this does not necessarily mean the caching or computation was successful.
59-
*/
60-
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
61-
62-
/**
63-
* :: DeveloperApi ::
64-
* Return information about blocks stored in all of the slaves
65-
*/
66-
@DeveloperApi
67-
def getExecutorStorageStatus: Array[StorageStatus] = {
68-
env.blockManager.master.getStorageStatus
69-
}
70-
71-
/**
72-
* :: DeveloperApi ::
73-
* Return pools for fair scheduler
74-
*/
75-
@DeveloperApi
76-
def getAllPools: Seq[Schedulable] = {
77-
// TODO(xiajunluan): We should take nested pools into account
78-
taskScheduler.rootPool.schedulableQueue.toSeq
79-
}
80-
81-
/**
82-
* :: DeveloperApi ::
83-
* Return the pool associated with the given name, if one exists
84-
*/
85-
@DeveloperApi
86-
def getPoolForName(pool: String): Option[Schedulable] = {
87-
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
88-
}
89-
90-
/**
91-
* Return current scheduling mode
92-
*/
93-
def getSchedulingMode: SchedulingMode.SchedulingMode = {
94-
taskScheduler.schedulingMode
95-
}
32+
class SparkStatusAPI private (sc: SparkContext) {
9633

34+
private val jobProgressListener = sc.jobProgressListener
9735

9836
/**
9937
* Return a list of all known jobs in a particular job group. The returned list may contain
@@ -140,3 +78,9 @@ private[spark] trait SparkStatusAPI { this: SparkContext =>
14078
}
14179
}
14280
}
81+
82+
private[spark] object SparkStatusAPI {
83+
def apply(sc: SparkContext): SparkStatusAPI = {
84+
new SparkStatusAPI(sc)
85+
}
86+
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class JavaSparkContext(val sc: SparkContext)
105105

106106
private[spark] val env = sc.env
107107

108+
def statusAPI = JavaSparkStatusAPI(sc)
109+
108110
def isLocal: java.lang.Boolean = sc.isLocal
109111

110112
def sparkUser: String = sc.sparkUser
@@ -134,25 +136,6 @@ class JavaSparkContext(val sc: SparkContext)
134136
/** Default min number of partitions for Hadoop RDDs when not given by user */
135137
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
136138

137-
138-
/**
139-
* Return a list of all known jobs in a particular job group. The returned list may contain
140-
* running, failed, and completed jobs, and may vary across invocations of this method. This
141-
* method does not guarantee the order of the elements in its result.
142-
*/
143-
def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)
144-
145-
/**
146-
* Returns job information, or `null` if the job info could not be found or was garbage collected.
147-
*/
148-
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull
149-
150-
/**
151-
* Returns stage information, or `null` if the stage info could not be found or was
152-
* garbage collected.
153-
*/
154-
def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull
155-
156139
/** Distribute a local Scala collection to form an RDD. */
157140
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
158141
implicit val ctag: ClassTag[T] = fakeClassTag

core/src/test/scala/org/apache/spark/StatusAPISuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
3737
jobIds.head
3838
}
3939
val jobInfo = eventually(timeout(10 seconds)) {
40-
sc.getJobInfo(jobId).get
40+
sc.statusAPI.getJobInfo(jobId).get
4141
}
4242
jobInfo.status() should not be FAILED
4343
val stageIds = jobInfo.stageIds()
4444
stageIds.size should be(2)
4545

4646
val firstStageInfo = eventually(timeout(10 seconds)) {
47-
sc.getStageInfo(stageIds(0)).get
47+
sc.statusAPI.getStageInfo(stageIds(0)).get
4848
}
4949
firstStageInfo.stageId() should be(stageIds(0))
5050
firstStageInfo.currentAttemptId() should be(0)
5151
firstStageInfo.numTasks() should be(2)
5252
eventually(timeout(10 seconds)) {
53-
val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get
53+
val updatedFirstStageInfo = sc.statusAPI.getStageInfo(stageIds(0)).get
5454
updatedFirstStageInfo.numCompletedTasks() should be(2)
5555
updatedFirstStageInfo.numActiveTasks() should be(0)
5656
updatedFirstStageInfo.numFailedTasks() should be(0)
@@ -59,20 +59,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
5959

6060
test("getJobIdsForGroup()") {
6161
sc.setJobGroup("my-job-group", "description")
62-
sc.getJobIdsForGroup("my-job-group") should be (Seq.empty)
62+
sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq.empty)
6363
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
6464
val firstJobId = eventually(timeout(10 seconds)) {
6565
firstJobFuture.jobIds.head
6666
}
6767
eventually(timeout(10 seconds)) {
68-
sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
68+
sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
6969
}
7070
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
7171
val secondJobId = eventually(timeout(10 seconds)) {
7272
secondJobFuture.jobIds.head
7373
}
7474
eventually(timeout(10 seconds)) {
75-
sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
75+
sc.statusAPI.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
7676
}
7777
}
7878
}

examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public static void main(String[] args) throws Exception {
5858
continue;
5959
}
6060
int currentJobId = jobIds.get(jobIds.size() - 1);
61-
SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
62-
SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
61+
SparkJobInfo jobInfo = sc.statusAPI().getJobInfo(currentJobId);
62+
SparkStageInfo stageInfo = sc.statusAPI().getStageInfo(jobInfo.stageIds()[0]);
6363
System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
6464
" active, " + stageInfo.numCompletedTasks() + " complete");
6565
}

0 commit comments

Comments
 (0)