Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/src/main/java/org/apache/spark/JobExecutionStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

public enum JobExecutionStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like it's missing KILLED, although I'm not sure whether it's possible to differentiate that from FAILED in the backend at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we have backend support for this, at least not at the JobProgressListener layer.

RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN
}
30 changes: 30 additions & 0 deletions core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like your goals here are:

  • people shouldn't implement these interfaces
  • people shouldn't be able to instantiate their own objects of these types

Instead of interface / private implementation, how about a public final class with a package-private constructor? Seems like this package is the only place where you're actually instantiating these types, so that should work just as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that exposing interfaces helps to maximize the flexibility of our internal implementations of Java-friendly classes. Imagine that I want to have a private[spark] method on an object that I expose to users. If I implement this as a Scala class, my private[spark] methods and fields are public from Java users' points of view. If I implement this as a public final class Java with package-private methods, then I need to add a separate class containing "forwarder" methods in order to allow the Java package-private methods to be called from Scala subpackages.

The need for this kind of flexibility isn't much of a concern for this specific case, since SparkJobInfo and SparkStageInfo are simple immutable objects, but it is a problem for classes like TaskContext. Since other Java-friendly APIs will likely be exposed as interfaces, I was thinking of doing the same here for consistency's sake.

Does this sound reasonable or are there other approaches / considerations that I'm missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point, but as you say, that doesn't really apply here. And a final class would avoid the scary comment.

But not a big deal.

* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
int jobId();
int[] stageIds();
JobExecutionStatus status();
}
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to encode attempt here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to include the current attempt id.

int stageId();
int currentAttemptId();
String name();
int numTasks();
int numActiveTasks();
int numCompletedTasks();
int numFailedTasks();
}
76 changes: 9 additions & 67 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
Expand All @@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd at least put a "TODO" somewhere to move this to a different package. This listener is not UI-specific anymore.

import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
Expand All @@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
* this config overrides the default configs as well as system properties.
*/

class SparkContext(config: SparkConf) extends Logging {
class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
Expand Down Expand Up @@ -230,10 +230,15 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI, registering all associated listeners

private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparkUI(this))
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
env.securityManager,appName))
} else {
// For tests, do not enable the UI
None
Expand Down Expand Up @@ -859,69 +864,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}

/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}

/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
Expand Down
142 changes: 142 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.Map
import scala.collection.JavaConversions._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: too many empty lines.

/**
* Trait that implements Spark's status APIs. This trait is designed to be mixed into
* SparkContext; it allows the status API code to live in its own file.
*/
private[spark] trait SparkStatusAPI { this: SparkContext =>

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}

/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}


/**
* Return a list of all known jobs in a particular job group. The returned list may contain
* running, failed, and completed jobs, and may vary across invocations of this method. This
* method does not guarantee the order of the elements in its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray
}
}

/**
* Returns job information, or `None` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized {
jobProgressListener.jobIdToData.get(jobId).map { data =>
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
}
}

/**
* Returns stage information, or `None` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized {
for (
info <- jobProgressListener.stageIdToInfo.get(stageId);
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
) yield {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.name,
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
}
}
}
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

private class SparkJobInfoImpl (
val jobId: Int,
val stageIds: Array[Int],
val status: JobExecutionStatus)
extends SparkJobInfo

private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
val numCompletedTasks: Int,
val numFailedTasks: Int)
extends SparkStageInfo
Loading