From ed10348e2326bc8fb4573754826dcbb8687ef4b6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 14 Jun 2014 08:59:40 -0700 Subject: [PATCH 01/20] Expose application id to spark context. Lay down the infrastructure to plumb a backend-generated application id back to the SparkContext, and make the application ID generated for apps running in standalone and yarn mode available. --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++-------------- .../spark/scheduler/SchedulerBackend.scala | 4 ++++ .../spark/scheduler/TaskScheduler.scala | 4 ++++ .../spark/scheduler/TaskSchedulerImpl.scala | 4 ++++ .../CoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/SimrSchedulerBackend.scala | 2 ++ .../cluster/SparkDeploySchedulerBackend.scala | 5 +++++ .../mesos/CoarseMesosSchedulerBackend.scala | 2 ++ .../cluster/mesos/MesosSchedulerBackend.scala | 2 ++ .../spark/scheduler/local/LocalBackend.scala | 3 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 1 + .../scheduler/TaskSchedulerImplSuite.scala | 1 + .../spark/deploy/yarn/ApplicationMaster.scala | 3 +++ .../cluster/YarnClientSchedulerBackend.scala | 2 ++ .../cluster/YarnClusterScheduler.scala | 18 ++++++++++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 5 ++++- 16 files changed, 58 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab7526..8986f379e2162 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -289,7 +289,7 @@ class SparkContext(config: SparkConf) extends Logging { value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } - Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => + Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. @@ -1203,10 +1203,10 @@ class SparkContext(config: SparkConf) extends Logging { /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) - * If checkSerializable is set, clean will also proactively - * check to see if f is serializable and throw a SparkException + * If checkSerializable is set, clean will also proactively + * check to see if f is serializable and throw a SparkException * if not. - * + * * @param f the closure to clean * @param checkSerializable whether or not to immediately check f for serializability * @throws SparkException if checkSerializable is set but f is not @@ -1520,7 +1520,7 @@ object SparkContext extends Logging { logWarning( "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") } - val scheduler = try { + try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] @@ -1531,18 +1531,6 @@ object SparkContext extends Logging { throw new SparkException("YARN mode not available ?", e) } } - val backend = try { - val clazz = - Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") - val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) - cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] - } catch { - case e: Exception => { - throw new SparkException("YARN mode not available ?", e) - } - } - scheduler.initialize(backend) - scheduler case "yarn-client" => val scheduler = try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index e41e0a9841691..16adcae7f7ca1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -31,4 +31,8 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException def isReady(): Boolean = true + + /** Get the application ID associated with the job, if any. */ + def applicationId(): Option[String] + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 819c35257b5a7..52ad524a59293 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,4 +54,8 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int + + /** Get the application ID associated with the job, if any. */ + def applicationId(): Option[String] + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4b6d6da5a6e61..b027d745bb6e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -452,6 +452,9 @@ private[spark] class TaskSchedulerImpl( } } } + + override def applicationId(): Option[String] = backend.applicationId() + } @@ -496,4 +499,5 @@ private[spark] object TaskSchedulerImpl { retval.toList } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9f085eef46720..0e92f191176e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -42,7 +42,7 @@ import org.apache.spark.ui.JettyUtils * (spark.deploy.*). */ private[spark] -class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) +abstract class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index d99c76117c168..fef7d28bbabd3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -66,4 +66,6 @@ private[spark] class SimrSchedulerBackend( fs.delete(new Path(driverFilePath), false) super.stop() } + + override def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bf2dc88e29048..1c924f627455c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -34,6 +34,7 @@ private[spark] class SparkDeploySchedulerBackend( var client: AppClient = null var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ + var appId: String = _ val maxCores = conf.getOption("spark.cores.max").map(_.toInt) @@ -76,6 +77,7 @@ private[spark] class SparkDeploySchedulerBackend( override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) + this.appId = appId } override def disconnected() { @@ -108,4 +110,7 @@ private[spark] class SparkDeploySchedulerBackend( logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason.toString) } + + override def applicationId(): Option[String] = Some(appId) + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 9f45400bcf852..7a4b931dc397a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -300,4 +300,6 @@ private[spark] class CoarseMesosSchedulerBackend( logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) slaveLost(d, s) } + + override def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c717e7c621a8f..64bb6fec5223f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -333,4 +333,6 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) + + override def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 9b95ccca0443e..f14a287efedf4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -107,4 +107,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localActor ! StatusUpdate(taskId, state, serializedData) } + + override def applicationId(): Option[String] = None + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9f498d579a095..e94e1710917ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -64,6 +64,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def applicationId() = None } /** Length of time to wait while draining listener events. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7532da88c6065..65ad6df1e8107 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -28,6 +28,7 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism() = 1 + override def applicationId() = None } class FakeTaskSetManager( diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 062f946a9fe93..59866dfde5231 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -89,6 +89,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + // setup AmIpFilter for the SparkUI - do this before we start the UI addAmIpFilter() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 1b37c4bb13f49..88952cdda40cd 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -116,4 +116,6 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } + override def applicationId(): Option[String] = Some(appId.toString()) + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 9ee53d797c8ea..0177c7be0f273 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -24,15 +24,18 @@ import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration /** - * - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done + * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") def this(sc: SparkContext) = this(sc, new Configuration()) + initialize(new YarnClusterSchedulerBackend(this, sc)) + // Nothing else for now ... initialize application master : which needs a SparkContext to // determine how to allocate. // Note that only the first creation of a SparkContext influences (and ideally, there must be @@ -57,4 +60,13 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) } logInfo("YarnClusterScheduler.postStartHook done") } + +} + +private class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") + } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1a24ec759b546..cc31b0719624a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -92,6 +92,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, amClient.init(yarnConf) amClient.start() + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + // setup AmIpFilter for the SparkUI - do this before we start the UI addAmIpFilter() @@ -150,7 +153,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .orElse(Option(System.getenv("LOCAL_DIRS"))) - + localDirs match { case None => throw new Exception("Yarn local dirs can't be empty") case Some(l) => l From 2fb7de48d2146db62e1c5808fe2c9c4ec449ab8a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 14 Jun 2014 09:19:08 -0700 Subject: [PATCH 02/20] Expose the application ID in the ApplicationStart event. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../scala/org/apache/spark/scheduler/SparkListener.scala | 4 ++-- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 4 +++- .../spark/scheduler/EventLoggingListenerSuite.scala | 3 ++- .../org/apache/spark/scheduler/ReplayListenerSuite.scala | 3 ++- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 9 ++++++++- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8986f379e2162..81812feac1ade 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1253,7 +1253,8 @@ class SparkContext(config: SparkConf) extends Logging { /** Post the application start event */ private def postApplicationStart() { - listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser)) + listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(), + startTime, sparkUser)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 82163eadd56e9..9c2212148117a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -76,8 +76,8 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) - extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long, + sparkUser: String) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 47eb44b530379..993ee392cf2a5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -164,6 +164,7 @@ private[spark] object JsonProtocol { def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ ("App Name" -> applicationStart.appName) ~ + ("App ID" -> applicationStart.appId.getOrElse(null)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) } @@ -462,9 +463,10 @@ private[spark] object JsonProtocol { def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { val appName = (json \ "App Name").extract[String] + val appId = (json \ "App ID").extract[Option[String]] val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] - SparkListenerApplicationStart(appName, time, sparkUser) + SparkListenerApplicationStart(appName, appId, time, sparkUser) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7a..27c0f7bdfd4e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -227,7 +227,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) val listenerBus = new LiveListenerBus - val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, + 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d81499ac6abef..38a45c2f50705 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -84,7 +84,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { val fstream = fileSystem.create(logFilePath) val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val writer = new PrintWriter(cstream) - val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, + 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 058d31453081a..8a2b824938851 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -57,7 +57,7 @@ class JsonProtocolSuite extends FunSuite { val blockManagerRemoved = SparkListenerBlockManagerRemoved( BlockManagerId("Scarce", "to be counted...", 100, 200)) val unpersistRdd = SparkListenerUnpersistRDD(12345) - val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") + val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) testEvent(stageSubmitted, stageSubmittedJsonString) @@ -144,6 +144,13 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("SparkListenerApplicationStart backwards compatibility") { + // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property. + val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user") + val oldEvent = JsonProtocol.applicationStartToJson(applicationStart) + .removeField({ _._1 == "App ID" }) + assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent)) + } /** -------------------------- * | Helper test running methods | From b3f3664cdc0f5007cd8eec315c501411b044392d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 14 Jun 2014 09:45:40 -0700 Subject: [PATCH 03/20] [yarn] Make the RM link point to the app direcly in the HS. --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 6 +++++- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 7 ++++++- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 +++- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 4 +++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 59866dfde5231..9fce420211bed 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -372,7 +372,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) finishReq.setDiagnostics(diagnostics) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + + val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") + .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .getOrElse("") + finishReq.setTrackingUrl(trackingUrl) resourceManager.finishApplicationMaster(finishReq) } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a86ad256dfa39..3d02dac8aa9e6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -289,7 +289,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + + val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") + .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .getOrElse("") + finishReq.setTrackingUrl(trackingUrl) + resourceManager.finishApplicationMaster(finishReq) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index cc31b0719624a..9b56ec344b420 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -326,7 +326,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Unregistering ApplicationMaster with " + status) if (registered) { - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") + .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .getOrElse("") amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl) } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 5ac95f3798723..c0dca3f29144f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -250,7 +250,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("Unregistering ApplicationMaster with " + status) - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") + .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .getOrElse("") amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) } From 26b266e4e9df8ba51f65ef3e86f44224c371b9ed Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 16 Jun 2014 10:45:09 -0700 Subject: [PATCH 04/20] Use Mesos framework ID as Spark application ID. --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 6 +++++- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7a4b931dc397a..6b928272e4c66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -79,6 +79,8 @@ private[spark] class CoarseMesosSchedulerBackend( var nextMesosTaskId = 0 + var frameworkId: FrameworkID = null + def newMesosTaskId(): Int = { val id = nextMesosTaskId nextMesosTaskId += 1 @@ -159,6 +161,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { logInfo("Registered as framework ID " + frameworkId.getValue) + this.frameworkId = frameworkId registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -301,5 +304,6 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } - override def applicationId(): Option[String] = None + override def applicationId(): Option[String] = + Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 64bb6fec5223f..76428951d8980 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + var frameworkId: FrameworkID = null + override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -160,6 +162,7 @@ private[spark] class MesosSchedulerBackend( } finally { restoreClassLoader(oldClassLoader) } + this.frameworkId = frameworkId } def waitForRegister() { @@ -334,5 +337,6 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) - override def applicationId(): Option[String] = None + override def applicationId(): Option[String] = + Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null) } From abc46974b8bfbf08eb9544c4ab8d366ce3bc2011 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 16 Jun 2014 17:16:10 -0700 Subject: [PATCH 05/20] Make FsHistoryProvider keep a map of applications by id. This makes it more efficient to search for applications by id, since it's not necessarily related to the location of the app in the file system. Memory usage should be little worse than before, but by a constant factor (since it's mostly the extra overhead of a LinkedHashMap over an ArrayBuffer to maintain the data). --- .../history/ApplicationHistoryProvider.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 180 +++++++++++------- .../scheduler/ApplicationEventListener.scala | 2 + .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- 8 files changed, 124 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index a0e8bd403a41d..1fde6c3d3b855 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -34,7 +34,7 @@ private[spark] abstract class ApplicationHistoryProvider { * * @return List of all know applications. */ - def getListing(): Seq[ApplicationHistoryInfo] + def getListing(): Iterable[ApplicationHistoryInfo] /** * Returns the Spark UI for a specific application. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8c9ac072449f..312efff7dadaa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -45,8 +45,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTimeMs = -1L + // The modification time of the newest log detected during the last scan. This is used + // to ignore logs that are older during subsequent scans, to avoid processing data that + // is already known. + private var mostRecentLogModTime = -1L + // List of applications, in order from newest to oldest. - @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil + @volatile private var appList: mutable.Map[String, FsApplicationHistoryInfo] = mutable.Map() /** * A background thread that periodically checks for event log updates on disk. @@ -91,12 +96,30 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis logCheckingThread.start() } - override def getListing() = appList + override def getListing() = appList.values override def getAppUI(appId: String): SparkUI = { try { - val appLogDir = fs.getFileStatus(new Path(logDir, appId)) - loadAppInfo(appLogDir, true)._2 + appList.get(appId).map(info => { + val (replayBus, appListener) = createReplayBus(fs.getFileStatus( + new Path(logDir, info.logDir))) + val ui = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } + + replayBus.replay() + + // Note that this does not have any effect due to SPARK-2169. + ui.setAppName(s"${appListener.appName} ($appId)") + + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setUIAcls(uiAclsEnabled) + ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + ui + }).getOrElse(null) } catch { case e: FileNotFoundException => null } @@ -116,80 +139,99 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis try { val logStatus = fs.listStatus(new Path(logDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs.filter { - dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) - } - - val currentApps = Map[String, ApplicationHistoryInfo]( - appList.map(app => (app.id -> app)):_*) - // For any application that either (i) is not listed or (ii) has changed since the last time - // the listing was created (defined by the log dir's modification time), load the app's info. - // Otherwise just reuse what's already in memory. - val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) - for (dir <- logInfos) { - val curr = currentApps.getOrElse(dir.getPath().getName(), null) - if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + // Load all new logs from the log directory. Only directories that have a modification time + // later than the last known log directory will be loaded. + var newMostRecentModTime = mostRecentLogModTime + val logInfos = logDirs + .filter { dir => + if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) { + val modTime = getModificationTime(dir) + newMostRecentModTime = math.max(newMostRecentModTime, modTime) + modTime > mostRecentLogModTime + } else { + false + } + } + .map { dir => try { - newApps += loadAppInfo(dir, false)._1 + val (replayBus, appListener) = createReplayBus(dir) + replayBus.replay() + new FsApplicationHistoryInfo( + dir.getPath().getName(), + appListener.appId.getOrElse(dir.getPath().getName()), + appListener.appName, + appListener.startTime, + appListener.endTime, + getModificationTime(dir), + appListener.sparkUser) } catch { - case e: Exception => logError(s"Failed to load app info from directory $dir.") + case e: Exception => + logInfo(s"Failed to load application log data from $dir.", e) + null + } + } + .sortBy { info => -info.endTime } + + mostRecentLogModTime = newMostRecentModTime + + if (!logInfos.isEmpty) { + var newAppList = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + + // Merge the new apps with the existing ones, discarding any duplicates. The new map + // is created in descending end time order. + var currentApps = appList.values.iterator + logInfos.foreach { info => + if (info != null) { + currentApps + .takeWhile(oldInfo => oldInfo.endTime > info.endTime) + .foreach { oldInfo => + if (!newAppList.contains(oldInfo.id)) { + newAppList += (oldInfo.id -> oldInfo) + } + } + + newAppList += (info.id -> info) + } + } + + currentApps.foreach { oldInfo => + if (!newAppList.contains(oldInfo.id)) { + newAppList += (oldInfo.id -> oldInfo) } - } else { - newApps += curr } - } - appList = newApps.sortBy { info => -info.endTime } + appList = newAppList + } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } } - /** - * Parse the application's logs to find out the information we need to build the - * listing page. - * - * When creating the listing of available apps, there is no need to load the whole UI for the - * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user - * clicks on a specific application. - * - * @param logDir Directory with application's log files. - * @param renderUI Whether to create the SparkUI for the application. - * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. - */ - private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { - val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) - val path = logDir.getPath - val appId = path.getName + private def loadAppUI(logDir: FileStatus, appId: String): SparkUI = { + val (replayBus, appListener) = createReplayBus(logDir) + val ui: SparkUI = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } + + replayBus.replay() + + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setUIAcls(uiAclsEnabled) + ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + ui + } + + private def createReplayBus(logDir: FileStatus) = { + val path = logDir.getPath() + val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs) val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) val appListener = new ApplicationEventListener replayBus.addListener(appListener) - - val ui: SparkUI = if (renderUI) { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) - // Do not call ui.bind() to avoid creating a new server for each application - } else { - null - } - - replayBus.replay() - val appInfo = ApplicationHistoryInfo( - appId, - appListener.appName, - appListener.startTime, - appListener.endTime, - getModificationTime(logDir), - appListener.sparkUser) - - if (ui != null) { - val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - ui.getSecurityManager.setUIAcls(uiAclsEnabled) - ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - } - (appInfo, ui) + (replayBus, appListener) } /** Return when this directory was last modified. */ @@ -212,3 +254,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000) } + +private class FsApplicationHistoryInfo( + val logDir: String, + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String) + extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index cd5d44ad4a7e6..53fffe40250c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -25,6 +25,7 @@ package org.apache.spark.scheduler */ private[spark] class ApplicationEventListener extends SparkListener { var appName = "" + var appId: Option[String] = None var sparkUser = "" var startTime = -1L var endTime = -1L @@ -42,6 +43,7 @@ private[spark] class ApplicationEventListener extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = applicationStart.appName + appId = applicationStart.appId startTime = applicationStart.time sparkUser = applicationStart.sparkUser } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index e07aa2ee3a5a2..f580583dae396 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -234,7 +234,7 @@ private[spark] object UIUtils extends Logging { def listingTable[T]( headers: Seq[String], generateDataRow: T => Seq[Node], - data: Seq[T], + data: Iterable[T], fixedWidth: Boolean = false): Seq[Node] = { var listingTableClass = TABLE_CLASS diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9fce420211bed..452f88c76938f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -374,7 +374,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, finishReq.setDiagnostics(diagnostics) val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") - .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .map(url => "%s/history/%s".format(url, appAttemptId.getApplicationId())) .getOrElse("") finishReq.setTrackingUrl(trackingUrl) resourceManager.finishApplicationMaster(finishReq) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 3d02dac8aa9e6..c4b43f415aa8d 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -291,7 +291,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp finishReq.setFinishApplicationStatus(status) val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") - .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .map(url => "%s/history/%s".format(url, appAttemptId.getApplicationId())) .getOrElse("") finishReq.setTrackingUrl(trackingUrl) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9b56ec344b420..ca87c4e01739b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -327,7 +327,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Unregistering ApplicationMaster with " + status) if (registered) { val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") - .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .map(url => "%s/history/%s".format(url, appAttemptId.getApplicationId())) .getOrElse("") amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index c0dca3f29144f..7be75af05cfe9 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -251,7 +251,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("Unregistering ApplicationMaster with " + status) val trackingUrl = sparkConf.getOption("spark.yarn.historyServer.address") - .map(url => "%s/by-id/%s".format(url, appAttemptId.getApplicationId())) + .map(url => "%s/history/%s".format(url, appAttemptId.getApplicationId())) .getOrElse("") amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) } From 0afd696d2458498cb7db8f1b6f9a18a81c7686f1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 24 Jun 2014 17:39:27 -0700 Subject: [PATCH 06/20] Wait until master responds before returning from start(). This allows the application ID set by the master to be included in the SparkListenerApplicationStart event. This should affect job scheduling because tasks can only be submitted after executors register, which will happen after the client registers with the master anyway. (This is similar to what the Mesos backend does to implement the same behavior.) --- .../cluster/SparkDeploySchedulerBackend.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 1c924f627455c..4a180d2367598 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -36,6 +36,9 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ var appId: String = _ + val registrationLock = new Object() + var registrationDone = false + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { @@ -64,6 +67,8 @@ private[spark] class SparkDeploySchedulerBackend( client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() + + waitForRegistration() } override def stop() { @@ -78,15 +83,18 @@ private[spark] class SparkDeploySchedulerBackend( override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) this.appId = appId + notifyRegistered() } override def disconnected() { + notifyRegistered() if (!stopping) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { + notifyRegistered() if (!stopping) { logError("Application has been killed. Reason: " + reason) scheduler.error(reason) @@ -113,4 +121,19 @@ private[spark] class SparkDeploySchedulerBackend( override def applicationId(): Option[String] = Some(appId) + private def waitForRegistration() = { + registrationLock.synchronized { + while (!registrationDone) { + registrationLock.wait() + } + } + } + + private def notifyRegistered() = { + registrationLock.synchronized { + registrationDone = true + registrationLock.notifyAll() + } + } + } From 36dc36250a25760e732bcef88b8010203926bd1d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 30 Jun 2014 10:42:02 -0700 Subject: [PATCH 07/20] Don't use Iterator::takeWhile(). It messes up the internal iterator state so it's not usable after the call, which we need here. (Mental note: read all the scaladoc next time.) --- .../deploy/history/FsHistoryProvider.scala | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 312efff7dadaa..454c77798c34d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -180,25 +180,31 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // Merge the new apps with the existing ones, discarding any duplicates. The new map // is created in descending end time order. - var currentApps = appList.values.iterator + val currentApps = appList.values.iterator + var current = if (currentApps.hasNext) currentApps.next else null + def addOldInfo(oldInfo: FsApplicationHistoryInfo) = { + if (!newAppList.contains(oldInfo.id)) { + newAppList += (oldInfo.id -> oldInfo) + } + } + + logInfos.foreach { info => if (info != null) { - currentApps - .takeWhile(oldInfo => oldInfo.endTime > info.endTime) - .foreach { oldInfo => - if (!newAppList.contains(oldInfo.id)) { - newAppList += (oldInfo.id -> oldInfo) - } - } + while (current != null && current.endTime > info.endTime) { + addOldInfo(current) + current = if (currentApps.hasNext) currentApps.next else null + } newAppList += (info.id -> info) } } + if (current != null) { + addOldInfo(current) + } currentApps.foreach { oldInfo => - if (!newAppList.contains(oldInfo.id)) { - newAppList += (oldInfo.id -> oldInfo) - } + addOldInfo(oldInfo) } appList = newAppList From d35d86f90268d0d1746802b050611c34f0092e8e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 16 Jul 2014 09:57:19 -0700 Subject: [PATCH 08/20] Fix yarn backend after rebase. --- .../spark/scheduler/cluster/YarnClusterScheduler.scala | 8 -------- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 5 ++++- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 0177c7be0f273..f1e36c13f852d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -62,11 +62,3 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) } } - -private class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) - with Logging { - - override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a04b08f43cc5a..e81373f96b31b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -22,7 +22,7 @@ import org.apache.spark.deploy.yarn.ApplicationMasterArguments import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.IntParam -private[spark] class YarnClusterSchedulerBackend( +private class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { @@ -37,4 +37,7 @@ private[spark] class YarnClusterSchedulerBackend( numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) totalExpectedExecutors.set(numExecutors) } + + override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") + } From 57517b87ee23076f6cc4f17e24cdee52266adb8c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 29 Jul 2014 13:11:13 -0700 Subject: [PATCH 09/20] Review feedback. Mostly, more consistent use of Scala's Option. --- .../scala/org/apache/spark/SparkContext.scala | 2 ++ .../history/ApplicationHistoryProvider.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 26 ++++++++++-------- .../spark/deploy/history/HistoryServer.scala | 5 +--- .../scheduler/ApplicationEventListener.scala | 27 +++++++++---------- .../cluster/SparkDeploySchedulerBackend.scala | 8 +++--- .../mesos/CoarseMesosSchedulerBackend.scala | 4 +-- .../cluster/mesos/MesosSchedulerBackend.scala | 4 +-- 8 files changed, 40 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 54dd311c4649b..b3201551a5cf0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1255,6 +1255,8 @@ class SparkContext(config: SparkConf) extends Logging { /** Post the application start event */ private def postApplicationStart() { + // Note: this code assumes that the task scheduler has been initialized and has contacted + // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(), startTime, sparkUser)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 1fde6c3d3b855..c366a90c1adf7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -42,7 +42,7 @@ private[spark] abstract class ApplicationHistoryProvider { * @param appId The application ID. * @return The application's UI, or null if application is not found. */ - def getAppUI(appId: String): SparkUI + def getAppUI(appId: String): Option[SparkUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 94ebfc29c375e..77432a8ec669c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -31,6 +31,8 @@ import org.apache.spark.util.Utils private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { + private val NOT_STARTED = "" + // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", conf.getInt("spark.history.updateInterval", 10)) * 1000 @@ -98,7 +100,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getListing() = appList.values - override def getAppUI(appId: String): SparkUI = { + override def getAppUI(appId: String): Option[SparkUI] = { try { appList.get(appId).map(info => { val (replayBus, appListener) = createReplayBus(fs.getFileStatus( @@ -114,15 +116,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis replayBus.replay() // Note that this does not have any effect due to SPARK-2169. - ui.setAppName(s"${appListener.appName} ($appId)") + ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)") val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setUIAcls(uiAclsEnabled) - ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED), + appListener.viewAcls.getOrElse("")) ui - }).getOrElse(null) + }) } catch { - case e: FileNotFoundException => null + case e: FileNotFoundException => None } } @@ -161,18 +164,18 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis new FsApplicationHistoryInfo( dir.getPath().getName(), appListener.appId.getOrElse(dir.getPath().getName()), - appListener.appName, - appListener.startTime, - appListener.endTime, + appListener.appName.getOrElse(NOT_STARTED), + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), getModificationTime(dir), - appListener.sparkUser) + appListener.sparkUser.getOrElse(NOT_STARTED)) } catch { case e: Exception => logInfo(s"Failed to load application log data from $dir.", e) null } } - .sortBy { info => -info.endTime } + .sortBy { info => if (info != null) -info.endTime else -1 } mostRecentLogModTime = newMostRecentModTime @@ -228,7 +231,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setUIAcls(uiAclsEnabled) - ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED), + appListener.viewAcls.getOrElse("")) ui } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index cacb9da8c947b..3a9b752bac2a7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -52,10 +52,7 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { - val ui = provider.getAppUI(key) - if (ui == null) { - throw new NoSuchElementException() - } + val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 53fffe40250c1..45c36b84dd775 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -24,39 +24,38 @@ package org.apache.spark.scheduler * from multiple applications are seen, the behavior is unspecified. */ private[spark] class ApplicationEventListener extends SparkListener { - var appName = "" + var appName: Option[String] = None var appId: Option[String] = None - var sparkUser = "" - var startTime = -1L - var endTime = -1L - var viewAcls = "" + var sparkUser: Option[String] = None + var startTime: Option[Long] = None + var endTime: Option[Long] = None + var viewAcls: Option[String] = None var enableViewAcls = false - def applicationStarted = startTime != -1 + def applicationStarted = startTime.isDefined - def applicationCompleted = endTime != -1 + def applicationCompleted = endTime.isDefined def applicationDuration: Long = { - val difference = endTime - startTime - if (applicationStarted && applicationCompleted && difference > 0) difference else -1L + if (applicationStarted && applicationCompleted) endTime.get - startTime.get else -1 } override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = applicationStart.appName + appName = Some(applicationStart.appName) appId = applicationStart.appId - startTime = applicationStart.time - sparkUser = applicationStart.sparkUser + startTime = Some(applicationStart.time) + sparkUser = Some(applicationStart.sparkUser) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = applicationEnd.time + endTime = Some(applicationEnd.time) } override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { val environmentDetails = environmentUpdate.environmentDetails val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.getOrElse("spark.ui.view.acls", "") + viewAcls = allProperties.get("spark.ui.view.acls") enableViewAcls = allProperties.getOrElse("spark.ui.acls.enable", "false").toBoolean } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 4a180d2367598..fe1c32747f139 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -83,18 +83,18 @@ private[spark] class SparkDeploySchedulerBackend( override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) this.appId = appId - notifyRegistered() + wakeUpContext() } override def disconnected() { - notifyRegistered() + wakeUpContext() if (!stopping) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { - notifyRegistered() + wakeUpContext() if (!stopping) { logError("Application has been killed. Reason: " + reason) scheduler.error(reason) @@ -129,7 +129,7 @@ private[spark] class SparkDeploySchedulerBackend( } } - private def notifyRegistered() = { + private def wakeUpContext() = { registrationLock.synchronized { registrationDone = true registrationLock.notifyAll() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 6b928272e4c66..728b460b713cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -304,6 +304,6 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } - override def applicationId(): Option[String] = - Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null) + override def applicationId(): Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 76428951d8980..c4c2419ab92d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -337,6 +337,6 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) - override def applicationId(): Option[String] = - Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null) + override def applicationId(): Option[String] = None + } From 4e3483f7705a77499f2a5e9d23c2a654ec1c8bbc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 29 Jul 2014 14:04:11 -0700 Subject: [PATCH 10/20] Fix test. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4884337b653f3..f6ac2091c0911 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -343,6 +343,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def applicationId() = None } val noKillScheduler = new DAGScheduler( sc, From b022bae2d454ba75d271ca5f0333b32c4c28c236 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 11 Aug 2014 13:03:11 -0700 Subject: [PATCH 11/20] Undo SparkContext cleanup. --- .../main/scala/org/apache/spark/SparkContext.scala | 14 +++++++++++++- .../scheduler/cluster/YarnClusterScheduler.scala | 2 -- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6c7b5eb66ffff..5cb3d11d2e1e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1553,7 +1553,7 @@ object SparkContext extends Logging { logWarning( "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") } - try { + val scheduler = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] @@ -1564,6 +1564,18 @@ object SparkContext extends Logging { throw new SparkException("YARN mode not available ?", e) } } + val backend = try { + val clazz = + Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") + val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) + cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] + } catch { + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) + } + } + scheduler.initialize(backend) + scheduler case "yarn-client" => val scheduler = try { diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 1c45f26562c78..ce65be0cc229d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -34,8 +34,6 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) def this(sc: SparkContext) = this(sc, new Configuration()) - initialize(new YarnClusterSchedulerBackend(this, sc)) - // Nothing else for now ... initialize application master : which needs a SparkContext to // determine how to allocate. // Note that only the first creation of a SparkContext influences (and ideally, there must be From 21aa71bc32c32da3ea4a6d4a850c45c5970d3de4 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 12 Aug 2014 14:37:52 -0700 Subject: [PATCH 12/20] Fix JSON test. --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 815823e330b75..bb243533dddc3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -718,8 +718,8 @@ class JsonProtocolSuite extends FunSuite { private val applicationStartJsonString = """ - {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42, - "User":"Garfield"} + {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","AppID":null, + "Timestamp":42,"User":"Garfield"} """ private val applicationEndJsonString = From 3f8ec665ec61181915e0c5e0eacc8cb692a6380a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Aug 2014 16:47:59 -0700 Subject: [PATCH 13/20] Review feedback. --- .../history/ApplicationHistoryProvider.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 112 ++++++++++-------- .../mesos/CoarseMesosSchedulerBackend.scala | 3 - .../cluster/mesos/MesosSchedulerBackend.scala | 3 - .../org/apache/spark/util/JsonProtocol.scala | 4 +- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 9 +- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 4 +- 9 files changed, 75 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index c366a90c1adf7..fbe39b27649f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -40,7 +40,7 @@ private[spark] abstract class ApplicationHistoryProvider { * Returns the Spark UI for a specific application. * * @param appId The application ID. - * @return The application's UI, or null if application is not found. + * @return The application's UI, or None if application is not found. */ def getAppUI(appId: String): Option[SparkUI] diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 31b25dfb60a77..160f85fc41d02 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -50,10 +50,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // The modification time of the newest log detected during the last scan. This is used // to ignore logs that are older during subsequent scans, to avoid processing data that // is already known. - private var mostRecentLogModTime = -1L + private var lastModifiedTime = -1L - // List of applications, in order from newest to oldest. - @volatile private var appList: mutable.Map[String, FsApplicationHistoryInfo] = mutable.Map() + // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted + // into the map in order, so the LinkedHashMap maintains the correct ordering. + @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] + = new mutable.LinkedHashMap() /** * A background thread that periodically checks for event log updates on disk. @@ -98,11 +100,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis logCheckingThread.start() } - override def getListing() = appList.values + override def getListing() = applications.values override def getAppUI(appId: String): Option[SparkUI] = { try { - appList.get(appId).map(info => { + applications.get(appId).map { info => val (replayBus, appListener) = createReplayBus(fs.getFileStatus( new Path(logDir, info.logDir))) val ui = { @@ -120,12 +122,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setAcls(uiAclsEnabled) - // make sure to set admin acls before view acls so properly picked up + // make sure to set admin acls before view acls so they are properly picked up ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED), - appListener.viewAcls.getOrElse("")) - ui - }) + appListener.viewAcls.getOrElse("")) + ui + } } catch { case e: FileNotFoundException => None } @@ -148,79 +150,95 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // Load all new logs from the log directory. Only directories that have a modification time // later than the last known log directory will be loaded. - var newMostRecentModTime = mostRecentLogModTime + var newLastModifiedTime = lastModifiedTime val logInfos = logDirs .filter { dir => if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) { val modTime = getModificationTime(dir) - newMostRecentModTime = math.max(newMostRecentModTime, modTime) - modTime > mostRecentLogModTime + newLastModifiedTime = math.max(newLastModifiedTime, modTime) + modTime > lastModifiedTime } else { false } } - .map { dir => + .flatMap { dir => try { val (replayBus, appListener) = createReplayBus(dir) replayBus.replay() - new FsApplicationHistoryInfo( + Some(new FsApplicationHistoryInfo( dir.getPath().getName(), appListener.appId.getOrElse(dir.getPath().getName()), appListener.appName.getOrElse(NOT_STARTED), appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(dir), - appListener.sparkUser.getOrElse(NOT_STARTED)) + appListener.sparkUser.getOrElse(NOT_STARTED))) } catch { case e: Exception => logInfo(s"Failed to load application log data from $dir.", e) - null + None } } - .sortBy { info => if (info != null) -info.endTime else -1 } + .sortBy { info => -info.endTime } - mostRecentLogModTime = newMostRecentModTime + lastModifiedTime = newLastModifiedTime if (!logInfos.isEmpty) { - var newAppList = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + var newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - // Merge the new apps with the existing ones, discarding any duplicates. The new map - // is created in descending end time order. - val currentApps = appList.values.iterator - var current = if (currentApps.hasNext) currentApps.next else null - def addOldInfo(oldInfo: FsApplicationHistoryInfo) = { - if (!newAppList.contains(oldInfo.id)) { - newAppList += (oldInfo.id -> oldInfo) - } + // A poor man's implementation of a peeking iterator, to help with the merge below. + def hasNext[T](it: Iterator[T], lookAhead: T): Boolean = { + lookAhead != null || it.hasNext } + def next[T](it: Iterator[T], lookAhead: T): T = { + if (lookAhead != null) lookAhead else it.next + } - logInfos.foreach { info => - if (info != null) { - while (current != null && current.endTime > info.endTime) { - addOldInfo(current) - current = if (currentApps.hasNext) currentApps.next else null - } + def addOldInfo(oldInfo: FsApplicationHistoryInfo) = { + if (!newApps.contains(oldInfo.id)) { + newApps += (oldInfo.id -> oldInfo) + } + } - newAppList += (info.id -> info) + // Merge the new apps with the existing ones, discarding any duplicates. + val newIterator = logInfos.iterator + var newLookAhead: FsApplicationHistoryInfo = null + val oldIterator = applications.values.iterator + var oldLookAhead: FsApplicationHistoryInfo = null + + while (hasNext(newIterator, newLookAhead) && hasNext(oldIterator, oldLookAhead)) { + newLookAhead = next(newIterator, newLookAhead) + oldLookAhead = next(oldIterator, oldLookAhead) + if (newLookAhead.endTime > oldLookAhead.endTime) { + newApps += (newLookAhead.id -> newLookAhead) + newLookAhead = null + } else { + addOldInfo(oldLookAhead) + oldLookAhead = null } } - if (current != null) { - addOldInfo(current) + while (hasNext(newIterator, newLookAhead)) { + newLookAhead = next(newIterator, newLookAhead) + newApps += (newLookAhead.id -> newLookAhead) + newLookAhead = null } - currentApps.foreach { oldInfo => - addOldInfo(oldInfo) + + while (hasNext(oldIterator, oldLookAhead)) { + oldLookAhead = next(oldIterator, oldLookAhead) + addOldInfo(oldLookAhead) + oldLookAhead = null } - appList = newAppList + applications = newApps } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } } - private def createReplayBus(logDir: FileStatus) = { + private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = { val path = logDir.getPath() val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs) val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) @@ -251,11 +269,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } private class FsApplicationHistoryInfo( - val logDir: String, - id: String, - name: String, - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String) + val logDir: String, + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String) extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 728b460b713cb..c4302211997a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -79,8 +79,6 @@ private[spark] class CoarseMesosSchedulerBackend( var nextMesosTaskId = 0 - var frameworkId: FrameworkID = null - def newMesosTaskId(): Int = { val id = nextMesosTaskId nextMesosTaskId += 1 @@ -161,7 +159,6 @@ private[spark] class CoarseMesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { logInfo("Registered as framework ID " + frameworkId.getValue) - this.frameworkId = frameworkId registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c4c2419ab92d1..d1db66dd103a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -62,8 +62,6 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null - var frameworkId: FrameworkID = null - override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -162,7 +160,6 @@ private[spark] class MesosSchedulerBackend( } finally { restoreClassLoader(oldClassLoader) } - this.frameworkId = frameworkId } def waitForRegister() { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e81982acf7c7f..7632434c4c4bb 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -166,7 +166,7 @@ private[spark] object JsonProtocol { def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ ("App Name" -> applicationStart.appName) ~ - ("App ID" -> applicationStart.appId.getOrElse(null)) ~ + ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) } @@ -476,7 +476,7 @@ private[spark] object JsonProtocol { def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { val appName = (json \ "App Name").extract[String] - val appId = (json \ "App ID").extract[Option[String]] + val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] SparkListenerApplicationStart(appName, appId, time, sparkUser) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index bb243533dddc3..06124cb35f4ca 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -718,7 +718,7 @@ class JsonProtocolSuite extends FunSuite { private val applicationStartJsonString = """ - {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","AppID":null, + {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","App ID":null, "Timestamp":42,"User":"Garfield"} """ diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ba486fce8e5b4..50158781ae303 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -136,12 +136,9 @@ object YarnSparkHadoopUtil { } def getUIHistoryAddress(conf: SparkConf, appId: String): String = { - val historyServerAddress = conf.get("spark.yarn.historyServer.address", null) - if (historyServerAddress != null) { - s"${historyServerAddress}${HistoryServer.UI_PATH_PREFIX}/${appId}" - } else { - "" - } + conf.getOption("spark.yarn.historyServer.address") + .map { address => s"$address${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 10dacc71247c3..540befdb2a970 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -153,6 +153,6 @@ private[spark] class YarnClientSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def applicationId(): Option[String] = if (appId != null) Some(appId.toString()) else None + override def applicationId(): Option[String] = Option(appId).map(_.toString()) } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 98c9574cc83e1..39436d0999663 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -22,13 +22,13 @@ import org.apache.spark.deploy.yarn.ApplicationMasterArguments import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.IntParam -private class YarnClusterSchedulerBackend( +private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { var totalExpectedExecutors = 0 - + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 } From c90a08d4f0a23a2dcd9b77e252e854a10a58f883 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Aug 2014 18:29:37 -0700 Subject: [PATCH 14/20] Remove unused code. --- .../apache/spark/scheduler/ApplicationEventListener.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index febbd8d53adb2..6d39a5e3fa64c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -32,14 +32,6 @@ private[spark] class ApplicationEventListener extends SparkListener { var viewAcls: Option[String] = None var adminAcls: Option[String] = None - def applicationStarted = startTime.isDefined - - def applicationCompleted = endTime.isDefined - - def applicationDuration: Long = { - if (applicationStarted && applicationCompleted) endTime.get - startTime.get else -1 - } - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = Some(applicationStart.appName) appId = applicationStart.appId From f0ba90f3248c8c1c554679bebb61246a77153976 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Aug 2014 18:49:31 -0700 Subject: [PATCH 15/20] Use BufferedIterator. --- .../deploy/history/FsHistoryProvider.scala | 48 +++++-------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 160f85fc41d02..0eb596486c9fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -186,50 +186,26 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis if (!logInfos.isEmpty) { var newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - // A poor man's implementation of a peeking iterator, to help with the merge below. - def hasNext[T](it: Iterator[T], lookAhead: T): Boolean = { - lookAhead != null || it.hasNext - } - - def next[T](it: Iterator[T], lookAhead: T): T = { - if (lookAhead != null) lookAhead else it.next - } - - def addOldInfo(oldInfo: FsApplicationHistoryInfo) = { - if (!newApps.contains(oldInfo.id)) { - newApps += (oldInfo.id -> oldInfo) + def addToList(info: FsApplicationHistoryInfo) = { + if (!newApps.contains(info.id)) { + newApps += (info.id -> info) } } // Merge the new apps with the existing ones, discarding any duplicates. - val newIterator = logInfos.iterator - var newLookAhead: FsApplicationHistoryInfo = null - val oldIterator = applications.values.iterator - var oldLookAhead: FsApplicationHistoryInfo = null - - while (hasNext(newIterator, newLookAhead) && hasNext(oldIterator, oldLookAhead)) { - newLookAhead = next(newIterator, newLookAhead) - oldLookAhead = next(oldIterator, oldLookAhead) - if (newLookAhead.endTime > oldLookAhead.endTime) { - newApps += (newLookAhead.id -> newLookAhead) - newLookAhead = null + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + + while (newIterator.hasNext && oldIterator.hasNext) { + if (newIterator.head.endTime > oldIterator.head.endTime) { + addToList(newIterator.next) } else { - addOldInfo(oldLookAhead) - oldLookAhead = null + addToList(oldIterator.next) } } - while (hasNext(newIterator, newLookAhead)) { - newLookAhead = next(newIterator, newLookAhead) - newApps += (newLookAhead.id -> newLookAhead) - newLookAhead = null - } - - while (hasNext(oldIterator, oldLookAhead)) { - oldLookAhead = next(oldIterator, oldLookAhead) - addOldInfo(oldLookAhead) - oldLookAhead = null - } + newIterator.foreach(addToList) + oldIterator.foreach(addToList) applications = newApps } From 25f282655a37116a94a273a4a2b7b44c4ec56861 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Aug 2014 19:36:57 -0700 Subject: [PATCH 16/20] Add MIMA excludes. Not so sure why the second one is needed at all (and why just plain "excludeSparkClass" doesn't work either). --- project/MimaExcludes.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b4653c72c10b5..e0af56babd167 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -95,6 +95,13 @@ object MimaExcludes { MimaBuild.excludeSparkClass("storage.Values") ++ MimaBuild.excludeSparkClass("storage.Entry") ++ MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++ + // Class was missing "@DeveloperApi" annotation in 1.0. + MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++ + // Class is "private[spark]" but for some reason not being ignored? + Seq( + ProblemFilters.exclude[AbstractClassProblem]( + "org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend") + ) ++ Seq( ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.mllib.tree.impurity.Gini.calculate"), @@ -110,7 +117,7 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$") - ) ++ + ) ++ Seq ( // package-private classes removed in MLlib ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne") From a0056e61c6281aeb01e1eaafa6d316bd9d391b92 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Aug 2014 12:45:19 -0700 Subject: [PATCH 17/20] Unbreak test. --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 06124cb35f4ca..83dff0fdc61b8 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -718,7 +718,7 @@ class JsonProtocolSuite extends FunSuite { private val applicationStartJsonString = """ - {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","App ID":null, + {"Event":"SparkListenerApplicationStart","App Name":"The winner of all", "Timestamp":42,"User":"Garfield"} """ From 56fe42e422507bbce6df88eb8dd72877be9bad80 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 27 Aug 2014 16:33:29 -0700 Subject: [PATCH 18/20] Fix cluster mode history address, plus a cleanup. --- .../spark/deploy/yarn/ApplicationMaster.scala | 16 +++++++++++++--- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 7 ------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 250f75185af34..737f736dc1a0f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} @@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private val sparkContextRef = new AtomicReference[SparkContext](null) final def run(): Int = { + val appAttemptId = client.getAttemptId() + if (isDriver) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box @@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Set the master property to match the requested mode. System.setProperty("spark.master", "yarn-cluster") + + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } - logInfo("ApplicationAttemptId: " + client.getAttemptId()) + logInfo("ApplicationAttemptId: " + appAttemptId) val cleanupHook = new Runnable { override def run() { @@ -153,8 +159,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def registerAM(uiAddress: String) = { val sc = sparkContextRef.get() - val historyAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkConf, - client.getAttemptId().getApplicationId().toString()) + + val appId = client.getAttemptId().getApplicationId().toString() + val historyAddress = + sparkConf.getOption("spark.yarn.historyServer.address") + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") allocator = client.register(yarnConf, if (sc != null) sc.getConf else sparkConf, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 9775e244b0936..cb11366a6bc59 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -155,12 +154,6 @@ object YarnSparkHadoopUtil { } } - def getUIHistoryAddress(conf: SparkConf, appId: String): String = { - conf.getOption("spark.yarn.historyServer.address") - .map { address => s"$address${HistoryServer.UI_PATH_PREFIX}/${appId}" } - .getOrElse("") - } - /** * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The From 6706d3a9786b6390045201be75e024e394fbc548 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 2 Sep 2014 10:50:02 -0700 Subject: [PATCH 19/20] Implement applicationId() in base classes. --- .../org/apache/spark/scheduler/SchedulerBackend.scala | 8 ++++++-- .../scala/org/apache/spark/scheduler/TaskScheduler.scala | 8 ++++++-- .../spark/scheduler/cluster/SimrSchedulerBackend.scala | 1 - .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 -- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 2 -- .../org/apache/spark/scheduler/local/LocalBackend.scala | 2 -- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 -- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 1 - 8 files changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 16adcae7f7ca1..a0be8307eff27 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -32,7 +32,11 @@ private[spark] trait SchedulerBackend { throw new UnsupportedOperationException def isReady(): Boolean = true - /** Get the application ID associated with the job, if any. */ - def applicationId(): Option[String] + /** + * The application ID associated with the job, if any. + * + * @return The application ID, or None if the backend does not provide an ID. + */ + def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index af839304542f3..1c1ce666eab0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -65,7 +65,11 @@ private[spark] trait TaskScheduler { def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean - /** Get the application ID associated with the job, if any. */ - def applicationId(): Option[String] + /** + * The application ID associated with the job, if any. + * + * @return The application ID, or None if the backend does not provide an ID. + */ + def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 1a085e4be5e9f..69c60fad2751d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -69,5 +69,4 @@ private[spark] class SimrSchedulerBackend( super.stop() } - override def applicationId(): Option[String] = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d2e6620ed6dd8..54d4e16d7059f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -302,6 +302,4 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } - override def applicationId(): Option[String] = None - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d1db66dd103a7..c7bb38959785c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -334,6 +334,4 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) - override def applicationId(): Option[String] = None - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 1b2f0676876a5..9ea25c2bc7090 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -115,6 +115,4 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: localActor ! StatusUpdate(taskId, state, serializedData) } - override def applicationId(): Option[String] = None - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2c9fc79cd777f..f5fed988ade24 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -92,7 +92,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def applicationId() = None } /** Length of time to wait while draining listener events. */ @@ -364,7 +363,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def applicationId() = None } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 65ad6df1e8107..7532da88c6065 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -28,7 +28,6 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism() = 1 - override def applicationId() = None } class FakeTaskSetManager( From 2d19f3c5b45b797fbd60cf79d99ec4de466403db Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 2 Sep 2014 11:01:03 -0700 Subject: [PATCH 20/20] Review feedback. --- .../deploy/history/FsHistoryProvider.scala | 20 +++++++++---------- .../CoarseGrainedSchedulerBackend.scala | 6 +++--- .../cluster/SparkDeploySchedulerBackend.scala | 10 +++++----- project/MimaExcludes.scala | 5 ----- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0eb596486c9fa..77a3d6cbe488b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -117,7 +117,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis replayBus.replay() - // Note that this does not have any effect due to SPARK-2169. ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)") val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) @@ -183,29 +182,28 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis lastModifiedTime = newLastModifiedTime + // When there are new logs, merge the new list with the existing one, maintaining + // the expected ordering (descending end time). Maintaining the order is important + // to avoid having to sort the list every time there is a request for the log list. if (!logInfos.isEmpty) { - var newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - - def addToList(info: FsApplicationHistoryInfo) = { + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfAbsent(info: FsApplicationHistoryInfo) = { if (!newApps.contains(info.id)) { newApps += (info.id -> info) } } - // Merge the new apps with the existing ones, discarding any duplicates. val newIterator = logInfos.iterator.buffered val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { if (newIterator.head.endTime > oldIterator.head.endTime) { - addToList(newIterator.next) + addIfAbsent(newIterator.next) } else { - addToList(oldIterator.next) + addIfAbsent(oldIterator.next) } } - - newIterator.foreach(addToList) - oldIterator.foreach(addToList) + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) applications = newApps } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 657e3778d1cc8..5b5257269d92f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -42,7 +42,7 @@ import org.apache.spark.ui.JettyUtils * (spark.deploy.*). */ private[spark] -abstract class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) +class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -51,12 +51,12 @@ abstract class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actor val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered resources / total expected resources) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) // Submit tasks after maxRegisteredWaitingTime milliseconds - // if minRegisteredRatio has not yet been reached + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) val createTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index aef1ec89c4f4f..06872ace2ecf4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -88,18 +88,18 @@ private[spark] class SparkDeploySchedulerBackend( override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) this.appId = appId - wakeUpContext() + notifyContext() } override def disconnected() { - wakeUpContext() + notifyContext() if (!stopping) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { - wakeUpContext() + notifyContext() if (!stopping) { logError("Application has been killed. Reason: " + reason) scheduler.error(reason) @@ -127,7 +127,7 @@ private[spark] class SparkDeploySchedulerBackend( totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } - override def applicationId(): Option[String] = Some(appId) + override def applicationId(): Option[String] = Option(appId) private def waitForRegistration() = { registrationLock.synchronized { @@ -137,7 +137,7 @@ private[spark] class SparkDeploySchedulerBackend( } } - private def wakeUpContext() = { + private def notifyContext() = { registrationLock.synchronized { registrationDone = true registrationLock.notifyAll() diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c065ade5a2202..48a3249852de3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,11 +108,6 @@ object MimaExcludes { MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++ // Class was missing "@DeveloperApi" annotation in 1.0. MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++ - // Class is "private[spark]" but for some reason not being ignored? - Seq( - ProblemFilters.exclude[AbstractClassProblem]( - "org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend") - ) ++ Seq( ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.mllib.tree.impurity.Gini.calculate"),