-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-2321] Stable pull-based progress / status API #2696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
24de263
ac2d13a
08cbec9
6e840d4
cc568e5
7319ffd
da5648e
249ca16
3dc79af
f9a9a00
787444c
b77b3d8
7f47d6d
646ff1d
c28ba76
2707f98
c96402d
b585c16
e6aa78d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark; | ||
|
|
||
| public enum JobExecutionStatus { | ||
| RUNNING, | ||
| SUCCEEDED, | ||
| FAILED, | ||
| UNKNOWN | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark; | ||
|
|
||
| /** | ||
| * Exposes information about Spark Jobs. | ||
| * | ||
| * This interface is not designed to be implemented outside of Spark. We may add additional methods | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like your goals here are:
Instead of interface / private implementation, how about a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that exposing interfaces helps to maximize the flexibility of our internal implementations of Java-friendly classes. Imagine that I want to have a The need for this kind of flexibility isn't much of a concern for this specific case, since SparkJobInfo and SparkStageInfo are simple immutable objects, but it is a problem for classes like TaskContext. Since other Java-friendly APIs will likely be exposed as interfaces, I was thinking of doing the same here for consistency's sake. Does this sound reasonable or are there other approaches / considerations that I'm missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand your point, but as you say, that doesn't really apply here. And a final class would avoid the scary comment. But not a big deal. |
||
| * which may break binary compatibility with outside implementations. | ||
| */ | ||
| public interface SparkJobInfo { | ||
| int jobId(); | ||
| int[] stageIds(); | ||
| JobExecutionStatus status(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark; | ||
|
|
||
| /** | ||
| * Exposes information about Spark Stages. | ||
| * | ||
| * This interface is not designed to be implemented outside of Spark. We may add additional methods | ||
| * which may break binary compatibility with outside implementations. | ||
| */ | ||
| public interface SparkStageInfo { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to encode attempt here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated this to include the current attempt id. |
||
| int stageId(); | ||
| int currentAttemptId(); | ||
| String name(); | ||
| int numTasks(); | ||
| int numActiveTasks(); | ||
| int numCompletedTasks(); | ||
| int numFailedTasks(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger | |
| import java.util.{Properties, UUID} | ||
| import java.util.UUID.randomUUID | ||
| import scala.collection.{Map, Set} | ||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.generic.Growable | ||
| import scala.collection.mutable.HashMap | ||
| import scala.reflect.{ClassTag, classTag} | ||
|
|
@@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me | |
| import org.apache.spark.scheduler.local.LocalBackend | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.ui.SparkUI | ||
| import org.apache.spark.ui.jobs.JobProgressListener | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd at least put a "TODO" somewhere to move this to a different package. This listener is not UI-specific anymore. |
||
| import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} | ||
|
|
||
| /** | ||
|
|
@@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat | |
| * this config overrides the default configs as well as system properties. | ||
| */ | ||
|
|
||
| class SparkContext(config: SparkConf) extends Logging { | ||
| class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { | ||
|
|
||
| // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, | ||
| // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It | ||
|
|
@@ -230,10 +230,15 @@ class SparkContext(config: SparkConf) extends Logging { | |
| private[spark] val metadataCleaner = | ||
| new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) | ||
|
|
||
| // Initialize the Spark UI, registering all associated listeners | ||
|
|
||
| private[spark] val jobProgressListener = new JobProgressListener(conf) | ||
| listenerBus.addListener(jobProgressListener) | ||
|
|
||
| // Initialize the Spark UI | ||
| private[spark] val ui: Option[SparkUI] = | ||
| if (conf.getBoolean("spark.ui.enabled", true)) { | ||
| Some(new SparkUI(this)) | ||
| Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, | ||
| env.securityManager,appName)) | ||
| } else { | ||
| // For tests, do not enable the UI | ||
| None | ||
|
|
@@ -859,69 +864,6 @@ class SparkContext(config: SparkConf) extends Logging { | |
| /** The version of Spark on which this application is running. */ | ||
| def version = SPARK_VERSION | ||
|
|
||
| /** | ||
| * Return a map from the slave to the max memory available for caching and the remaining | ||
| * memory available for caching. | ||
| */ | ||
| def getExecutorMemoryStatus: Map[String, (Long, Long)] = { | ||
| env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => | ||
| (blockManagerId.host + ":" + blockManagerId.port, mem) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return information about what RDDs are cached, if they are in mem or on disk, how much space | ||
| * they take, etc. | ||
| */ | ||
| @DeveloperApi | ||
| def getRDDStorageInfo: Array[RDDInfo] = { | ||
| val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray | ||
| StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) | ||
| rddInfos.filter(_.isCached) | ||
| } | ||
|
|
||
| /** | ||
| * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. | ||
| * Note that this does not necessarily mean the caching or computation was successful. | ||
| */ | ||
| def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return information about blocks stored in all of the slaves | ||
| */ | ||
| @DeveloperApi | ||
| def getExecutorStorageStatus: Array[StorageStatus] = { | ||
| env.blockManager.master.getStorageStatus | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return pools for fair scheduler | ||
| */ | ||
| @DeveloperApi | ||
| def getAllPools: Seq[Schedulable] = { | ||
| // TODO(xiajunluan): We should take nested pools into account | ||
| taskScheduler.rootPool.schedulableQueue.toSeq | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return the pool associated with the given name, if one exists | ||
| */ | ||
| @DeveloperApi | ||
| def getPoolForName(pool: String): Option[Schedulable] = { | ||
| Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) | ||
| } | ||
|
|
||
| /** | ||
| * Return current scheduling mode | ||
| */ | ||
| def getSchedulingMode: SchedulingMode.SchedulingMode = { | ||
| taskScheduler.schedulingMode | ||
| } | ||
|
|
||
| /** | ||
| * Clear the job's list of files added by `addFile` so that they do not get downloaded to | ||
| * any new nodes. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| import scala.collection.Map | ||
| import scala.collection.JavaConversions._ | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.scheduler.{SchedulingMode, Schedulable} | ||
| import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo} | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: too many empty lines. |
||
| /** | ||
| * Trait that implements Spark's status APIs. This trait is designed to be mixed into | ||
| * SparkContext; it allows the status API code to live in its own file. | ||
| */ | ||
| private[spark] trait SparkStatusAPI { this: SparkContext => | ||
|
|
||
| /** | ||
| * Return a map from the slave to the max memory available for caching and the remaining | ||
| * memory available for caching. | ||
| */ | ||
| def getExecutorMemoryStatus: Map[String, (Long, Long)] = { | ||
| env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => | ||
| (blockManagerId.host + ":" + blockManagerId.port, mem) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return information about what RDDs are cached, if they are in mem or on disk, how much space | ||
| * they take, etc. | ||
| */ | ||
| @DeveloperApi | ||
| def getRDDStorageInfo: Array[RDDInfo] = { | ||
| val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray | ||
| StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) | ||
| rddInfos.filter(_.isCached) | ||
| } | ||
|
|
||
| /** | ||
| * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. | ||
| * Note that this does not necessarily mean the caching or computation was successful. | ||
| */ | ||
| def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return information about blocks stored in all of the slaves | ||
| */ | ||
| @DeveloperApi | ||
| def getExecutorStorageStatus: Array[StorageStatus] = { | ||
| env.blockManager.master.getStorageStatus | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return pools for fair scheduler | ||
| */ | ||
| @DeveloperApi | ||
| def getAllPools: Seq[Schedulable] = { | ||
| // TODO(xiajunluan): We should take nested pools into account | ||
| taskScheduler.rootPool.schedulableQueue.toSeq | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Return the pool associated with the given name, if one exists | ||
| */ | ||
| @DeveloperApi | ||
| def getPoolForName(pool: String): Option[Schedulable] = { | ||
| Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) | ||
| } | ||
|
|
||
| /** | ||
| * Return current scheduling mode | ||
| */ | ||
| def getSchedulingMode: SchedulingMode.SchedulingMode = { | ||
| taskScheduler.schedulingMode | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Return a list of all known jobs in a particular job group. The returned list may contain | ||
| * running, failed, and completed jobs, and may vary across invocations of this method. This | ||
| * method does not guarantee the order of the elements in its result. | ||
| */ | ||
| def getJobIdsForGroup(jobGroup: String): Array[Int] = { | ||
| jobProgressListener.synchronized { | ||
| val jobData = jobProgressListener.jobIdToData.valuesIterator | ||
| jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns job information, or `None` if the job info could not be found or was garbage collected. | ||
| */ | ||
| def getJobInfo(jobId: Int): Option[SparkJobInfo] = { | ||
| jobProgressListener.synchronized { | ||
| jobProgressListener.jobIdToData.get(jobId).map { data => | ||
| new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns stage information, or `None` if the stage info could not be found or was | ||
| * garbage collected. | ||
| */ | ||
| def getStageInfo(stageId: Int): Option[SparkStageInfo] = { | ||
| jobProgressListener.synchronized { | ||
| for ( | ||
| info <- jobProgressListener.stageIdToInfo.get(stageId); | ||
| data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) | ||
| ) yield { | ||
| new SparkStageInfoImpl( | ||
| stageId, | ||
| info.attemptId, | ||
| info.name, | ||
| info.numTasks, | ||
| data.numActiveTasks, | ||
| data.numCompleteTasks, | ||
| data.numFailedTasks) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| private class SparkJobInfoImpl ( | ||
| val jobId: Int, | ||
| val stageIds: Array[Int], | ||
| val status: JobExecutionStatus) | ||
| extends SparkJobInfo | ||
|
|
||
| private class SparkStageInfoImpl( | ||
| val stageId: Int, | ||
| val currentAttemptId: Int, | ||
| val name: String, | ||
| val numTasks: Int, | ||
| val numActiveTasks: Int, | ||
| val numCompletedTasks: Int, | ||
| val numFailedTasks: Int) | ||
| extends SparkStageInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like it's missing
KILLED, although I'm not sure whether it's possible to differentiate that fromFAILEDin the backend at the moment.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we have backend support for this, at least not at the JobProgressListener layer.