diff --git a/CHANGES.txt b/CHANGES.txt index 251abe1db7729..26a4eb5bf9a0a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,214 @@ Spark Change Log ---------------- +Release 0.9.2 + + [branch-0.9] bump versions for v0.9.2 release candidate + Xiangrui Meng + 2014-07-16 23:53:40 -0700 + Commit: c9a22e8, github.com/apache/spark/pull/1458 + + [branch-0.9] Fix github links in docs + Xiangrui Meng + 2014-07-16 23:39:02 -0700 + Commit: 60f4b3b, github.com/apache/spark/pull/1456 + + [SPARK-1112, 2156] (0.9 edition) Use correct akka frame size and overhead amounts. + Patrick Wendell + 2014-07-16 21:30:50 -0700 + Commit: 7edee34, github.com/apache/spark/pull/1455 + + [SPARK-2433][MLLIB] fix NaiveBayesModel.predict + Xiangrui Meng + 2014-07-16 20:12:09 -0700 + Commit: 0116dee, github.com/apache/spark/pull/1453 + + [SPARK-2362] Fix for newFilesOnly logic in file DStream + Gabriele Nizzoli + 2014-07-08 14:23:38 -0700 + Commit: 8e5604b, github.com/apache/spark/pull/1077 + + SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark + Aaron Davidson + 2014-07-03 23:02:36 -0700 + Commit: 57873ef, github.com/apache/spark/pull/1220 + + [SPARK-2350] Don't NPE while launching drivers + Aaron Davidson + 2014-07-03 22:31:41 -0700 + Commit: c37e9ed, github.com/apache/spark/pull/1289 + + [SPARK-1516]Throw exception in yarn client instead of run system.exit + John Zhao + 2014-07-03 15:17:51 -0700 + Commit: 0d3d5ce, github.com/apache/spark/pull/1099 + + HOTFIX: Removing out dated python path in testing tool. + Patrick Wendell + 2014-06-27 18:19:16 -0700 + Commit: b3f4245 + + [SPARK-1912] fix compress memory issue during reduce + Wenchen Fan(Cloud) + 2014-06-03 13:18:20 -0700 + Commit: 9509819, github.com/apache/spark/pull/860 + + SPARK-2241: quote command line args in ec2 script + Ori Kremer + 2014-06-22 20:21:23 -0700 + Commit: ef8501d, github.com/apache/spark/pull/1169 + + HOTFIX: bug caused by #941 + Patrick Wendell + 2014-06-17 15:09:24 -0700 + Commit: 2a2eace, github.com/apache/spark/pull/1108 + + SPARK-1990: added compatibility for python 2.6 for ssh_read command + Anant + 2014-06-16 23:42:27 -0700 + Commit: 8e9f479, github.com/apache/spark/pull/941 + + [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not re... + joyyoj + 2014-06-10 17:26:17 -0700 + Commit: 706e38f, github.com/apache/spark/pull/951 + + Spark 1384 - Fix spark-shell on yarn access to secure hdfs - branch-0.9 only + Thomas Graves + 2014-06-09 23:07:25 -0700 + Commit: cc95d97, github.com/apache/spark/pull/287 + + [SPARK-1870] Made deployment with --jars work in yarn-standalone mode. + DB Tsai + 2014-06-09 22:56:24 -0700 + Commit: 1d3aab9, github.com/apache/spark/pull/1013 + + SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys + Matei Zaharia + 2014-06-05 23:01:48 -0700 + Commit: 51f677e, github.com/apache/spark/pull/986 + + SPARK-1790: Update EC2 scripts to support r3 instance types + Varakhedi Sujeet + 2014-06-04 16:01:56 -0700 + Commit: 6634a34, github.com/apache/spark/pull/960 + + [SPARK-1468] Modify the partition function used by partitionBy. + Erik Selin + 2014-06-03 13:31:16 -0700 + Commit: 41e7853, github.com/apache/spark/pull/371 + + SPARK-1917: fix PySpark import of scipy.special functions + Uri Laserson + 2014-05-31 14:59:09 -0700 + Commit: e03af41, github.com/apache/spark/pull/866 + + SPARK-1935: Explicitly add commons-codec 1.5 as a dependency (for branch-0.9). + Yin Huai + 2014-05-30 22:12:17 -0700 + Commit: 563bfe1, github.com/apache/spark/pull/912 + + SPARK-1188: Do not re-use objects in the EdgePartition/EdgeTriplet iterators. + Daniel Darabos + 2014-04-02 12:27:37 -0700 + Commit: a92900c, github.com/apache/spark/pull/276 + + [SPARK-1712]: TaskDescription instance is too big causes Spark to hang + witgo + 2014-05-28 15:57:05 -0700 + Commit: aef6390, github.com/apache/spark/pull/694 + + Spark 1916 + David Lemieux + 2014-05-28 15:50:35 -0700 + Commit: 234a378, github.com/apache/spark/pull/865 + + SPARK-1145: Memory mapping with many small blocks can cause JVM allocation failures + Patrick Wendell + 2014-04-27 17:40:56 -0700 + Commit: 7633949, github.com/apache/spark/pull/43 + + Update version to 0.9.2-SNAPSHOT in sbt + Matei Zaharia + 2014-05-11 16:54:54 -0700 + Commit: c9f40d0 + + SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo + Sandeep + 2014-05-08 22:30:17 -0700 + Commit: bea2be3, github.com/apache/spark/pull/707 + + [SPARK-1685] Cancel retryTimer on restart of Worker or AppClient + Mark Hamstra + 2014-05-06 12:53:39 -0700 + Commit: 9e2c59e, github.com/apache/spark/pull/602 + + [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak + Thomas Graves + 2014-05-03 10:59:05 -0700 + Commit: 45561cd, github.com/apache/spark/pull/621 + + version number fix + Nan Zhu + 2014-04-21 23:42:47 -0700 + Commit: 54c3b7e, github.com/apache/spark/pull/467 + + Small syntax error from previous backport + Patrick Wendell + 2014-04-13 14:32:22 -0700 + Commit: 9e89789 + + Update WindowedDStream.scala + baishuo(白硕) + 2014-04-11 20:33:42 -0700 + Commit: 4a325e1, github.com/apache/spark/pull/390 + + Fixed typo on Spark quick-start docs. + Tathagata Das + 2014-04-07 18:27:46 -0700 + Commit: 19cf2f7 + + SPARK-1432: Make sure that all metadata fields are properly cleaned + Davis Shepherd + 2014-04-07 10:02:00 -0700 + Commit: 69fc97d, github.com/apache/spark/pull/338 + + SPARK-1421. Make MLlib work on Python 2.6 + Matei Zaharia + 2014-04-05 20:52:05 -0700 + Commit: 139fc1a, github.com/apache/spark/pull/335 + + Update documentation for work around for SPARK-1384 + Thomas Graves + 2014-04-04 18:26:51 -0700 + Commit: d4df076, github.com/apache/spark/pull/314 + + SPARK-1337: Application web UI garbage collects newest stages + Patrick Wendell + 2014-04-03 22:13:56 -0700 + Commit: 7f727cf, github.com/apache/spark/pull/320 + + [SPARK-1134] Fix and document passing of arguments to IPython + Diana Carroll + 2014-04-03 15:48:42 -0700 + Commit: d9c7a80, github.com/apache/spark/pull/294 + + Spark 1162 Implemented takeOrdered in pyspark. + Prashant Sharma + 2014-04-03 15:42:17 -0700 + Commit: 28e7643, github.com/apache/spark/pull/97 + + fix path for jar, make sed actually work on OSX + Nick Lanham + 2014-03-28 13:33:35 -0700 + Commit: a6c955a, github.com/apache/spark/pull/264 + + Make sed do -i '' on OSX + Nick Lanham + 2014-03-27 22:45:00 -0700 + Commit: 4afbd19, github.com/apache/spark/pull/258 + + Release 0.9.1 Revert "[maven-release-plugin] prepare release v0.9.1-rc2" diff --git a/assembly/pom.xml b/assembly/pom.xml index dde1602b47c08..215ad1b769d36 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index cf1be6fcc045e..260d58303fb6c 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 9f3b5c74a72ff..81ba4f68fa0b1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a7790ce03d619..f4ec53a43b26f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -307,6 +307,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: } else { // This happens on the master, where we pass the updates to Python through a socket val socket = new Socket(serverHost, serverPort) + // SPARK-2282: Immediately reuse closed sockets because we create one per task. + socket.setReuseAddress(true) val in = socket.getInputStream val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) out.writeInt(val2.size) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 3e26379166885..3847b9129c27c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -90,11 +90,13 @@ private[spark] class ApplicationInfo( def retryCount = _retryCount - def incrementRetryCount = { + def incrementRetryCount() = { _retryCount += 1 _retryCount } + def resetRetryCount() = _retryCount = 0 + def markFinished(endState: ApplicationState.Value) { state = endState endTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d72bb7a4cc057..249e75a073027 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -264,27 +264,34 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { + val appInfo = idToApp(appId) exec.state = state + if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { - val appInfo = idToApp(appId) // Remove this executor from the worker and app - logInfo("Removing executor " + exec.fullId + " because it is " + state) + logInfo(s"Removing executor ${exec.fullId} because it is $state") appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) + val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. - if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { - schedule() - } else { - logError("Application %s with ID %s failed %d times, removing it".format( - appInfo.desc.name, appInfo.id, appInfo.retryCount)) - removeApplication(appInfo, ApplicationState.FAILED) + if (!normalExit) { + if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { + schedule() + } else { + val execs = appInfo.executors.values + if (!execs.exists(_.state == ExecutorState.RUNNING)) { + logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + + s"${appInfo.retryCount} times; removing it") + removeApplication(appInfo, ApplicationState.FAILED) + } + } } } } case None => - logWarning("Got status update for unknown executor " + appId + "/" + execId) + logWarning(s"Got status update for unknown executor $appId/$execId") } } @@ -450,7 +457,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // First schedule drivers, they take strict precedence over applications val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 18885d7ca6daa..513f8e6c44d9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -139,9 +139,10 @@ private[spark] class ExecutorRunner( Files.write(header, stderr, Charsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) - // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run - // long-lived processes only. However, in the future, we might restart the executor a few - // times on the same machine. + state = ExecutorState.RUNNING + worker ! ExecutorStateChanged(appId, execId, state, None, None) + // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) + // or with nonzero exit code val exitCode = process.waitFor() state = ExecutorState.FAILED val message = "Command exited with code " + exitCode diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 4b41ded80abda..401071b50485c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -211,18 +211,29 @@ private[spark] class Worker( if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - // TODO (pwendell): We shuld make sparkHome an Option[String] in - // ApplicationDescription to be more explicit about this. - val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + try { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.LOADING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } + } catch { + case e: Exception => { + logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + if (executors.contains(appId + "/" + execId)) { + executors(appId + "/" + execId).kill() + executors -= appId + "/" + execId + } + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) + } + } } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2946af70984c9..404b7c0e596dc 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, - cores: Int) + cores: Int, + actorSystem: ActorSystem) extends Actor with ExecutorBackend with Logging { @@ -93,6 +94,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) } + + override def akkaFrameSize() = actorSystem.settings.config.getBytes( + "akka.remote.netty.tcp.maximum-frame-size") } private[spark] object CoarseGrainedExecutorBackend { @@ -110,7 +114,8 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores, + actorSystem), name = "Executor") workerUrl.foreach { url => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 356c04fed8a2a..fb16618712068 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -96,10 +96,6 @@ private[spark] class Executor( } } - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") @@ -209,8 +205,10 @@ private[spark] class Executor( val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) + val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + if (serializedDirectResult.limit >= execBackend.akkaFrameSize() - + AkkaUtils.reservedSizeBytes) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala index ad7dd34c76940..99474034eb158 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala @@ -25,4 +25,7 @@ import org.apache.spark.TaskState.TaskState */ private[spark] trait ExecutorBackend { def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) + + // Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark. + def akkaFrameSize(): Long = Long.MaxValue } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc5c95f1236ec..bfd6b51b723a4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -142,7 +142,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A for (task <- tasks.flatten) { val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) - if (serializedTask.limit >= akkaFrameSize - 1024) { + if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 36e910c528a6a..f0437bfb688ea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer import akka.actor.{Actor, ActorRef, Props} - -import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} +import org.apache.spark.{Logging, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.util.AkkaUtils private case class ReviveOffers() @@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localActor ! StatusUpdate(taskId, state, serializedData) } + + // This limit is calculated only to preserve expected behavior in tests. In reality, since this + // backend sends messages over the existing actor system, there is no need to enforce a limit. + override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 4dcf39b0683d2..7301e54d179d1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -115,4 +115,7 @@ private[spark] object AkkaUtils { def maxFrameSizeBytes(conf: SparkConf): Int = { conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } + + /** Space reserved for extra data in an Akka message besides serialized task or task result. */ + val reservedSizeBytes = 200 * 1024 } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index b6e064c38fd31..ac341b3cb6b74 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -161,7 +161,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -170,14 +169,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. - // Note that the size is hand-selected here because map output statuses are compressed before - // being sent. - masterTracker.registerShuffle(20, 100) - (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + // Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should + // throw exception. + val shuffleId = 20 + val numMaps = 2 + val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf)) + val random = new java.util.Random(0) + random.nextBytes(data) // Make it hard to compress. + masterTracker.registerShuffle(shuffleId, numMaps) + (0 until numMaps).foreach { i => + masterTracker.registerMapOutput(shuffleId, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), data)) } - intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) } } } diff --git a/dev/run-tests b/dev/run-tests index 00038718335a1..596ed2f66a919 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -35,7 +35,4 @@ sbt/sbt clean assembly test echo "=========================================================================" echo "Running PySpark tests" echo "=========================================================================" -if [ -z "$PYSPARK_PYTHON" ]; then - export PYSPARK_PYTHON=/usr/local/bin/python2.7 -fi ./python/run-tests diff --git a/docs/_config.yml b/docs/_config.yml index db6cf74b4af93..764548306a14c 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.9.1 -SPARK_VERSION_SHORT: 0.9.1 +SPARK_VERSION: 0.9.2 +SPARK_VERSION_SHORT: 0.9.2 SCALA_BINARY_VERSION: "2.10" SCALA_VERSION: "2.10.3" MESOS_VERSION: 0.13.0 diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md index da6d0c9dcd97b..a2f93a1cd1ab3 100644 --- a/docs/bagel-programming-guide.md +++ b/docs/bagel-programming-guide.md @@ -108,7 +108,7 @@ _Example_ ## Operations -Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details. +Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/spark/tree/branch-0.9/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details. ### Actions diff --git a/docs/index.md b/docs/index.md index 55d8788332fdd..9308ca30e055f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -117,7 +117,7 @@ Note that on Windows, you need to set the environment variables on separate line exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012), [slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are available online for free. -* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark +* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/branch-0.9/examples/src/main/scala/) of Spark * [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf) * [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index 5c73dbb25ede8..7e8f7830f512e 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -189,7 +189,7 @@ We hope to generate documentation with Java-style syntax in the future. # Where to Go from Here Spark includes several sample programs using the Java API in -[`examples/src/main/java`](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the +[`examples/src/main/java`](https://github.com/apache/spark/tree/branch-0.9/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the `bin/run-example` script included in Spark; for example: ./bin/run-example org.apache.spark.examples.JavaWordCount diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 178115b9542d0..a529f4e34b527 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -157,7 +157,7 @@ some example applications. # Where to Go from Here -PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples). +PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/branch-0.9/python/examples). You can run them by passing the files to `pyspark`; e.g.: ./bin/pyspark python/examples/wordcount.py diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index f9904d45013f6..7cc71a820fbb3 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -125,7 +125,7 @@ ssc.awaitTermination() // Wait for the computation to terminate {% endhighlight %} The complete code can be found in the Spark Streaming example -[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala). +[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala).
@@ -207,7 +207,7 @@ jssc.awaitTermination(); // Wait for the computation to terminate {% endhighlight %} The complete code can be found in the Spark Streaming example -[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java). +[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java).
@@ -602,7 +602,7 @@ JavaPairDStream runningCounts = pairs.updateStateByKey(updateFu The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Scala code, take a look at the example -[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala). +[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala).

Transform Operation

@@ -1075,7 +1075,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the If the directory does not exist (i.e., running for the first time), then the function `functionToCreateContext` will be called to create a new context and set up the DStreams. See the Scala example -[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala). +[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala). This example appends the word counts of network data into a file. You can also explicitly create a `StreamingContext` from the checkpoint data and start the @@ -1114,7 +1114,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the If the directory does not exist (i.e., running for the first time), then the function `contextFactory` will be called to create a new context and set up the DStreams. See the Scala example -[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/JavaRecoverableWordCount.scala) +[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/JavaRecoverableWordCount.scala) (note that this example is missing in the 0.9 release, so you can test it using the master branch). This example appends the word counts of network data into a file. @@ -1253,6 +1253,6 @@ and output 30 after recovery. [ZeroMQ](api/external/zeromq/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and [MQTT](api/external/mqtt/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$) -* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples) - and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples) +* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples) + and [Java]({{site.SPARK_GITHUB_URL}}/tree/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples) * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) describing Spark Streaming diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6d06a5db53625..c522e6816a9df 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -70,7 +70,7 @@ def parse_args(): "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("-v", "--spark-version", default="0.9.1", + parser.add_option("-v", "--spark-version", default="0.9.2", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", default="https://github.com/apache/spark", @@ -157,7 +157,8 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): - spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", "0.9.1": "0.9.1"} + spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", \ + "0.9.0": "0.9.0", "0.9.1": "0.9.1", "0.9.2": "0.9.2"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version diff --git a/examples/pom.xml b/examples/pom.xml index 02a4e4b87afc3..76ba741ae329a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index f3ee20021006b..418673daf3184 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index c5202bb0ec1f2..c1fb94b1249d1 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 5b5f55e7ef0fd..80e87d430e93e 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index d6ce74b83cd1d..7ff73e05bb65a 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 8ba4ed4b3523f..1bd689616b4be 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 7918f683d36bc..2fca01294ea83 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-1-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index d895f6b113755..cb93411d617ed 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 0d0287e7b2969..8bdd5cdf0dca6 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 1676979088621..e7c771840a977 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bdf883e0ddc54..7b8f5a08b2ccc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -134,7 +134,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", - version := "0.9.2-SNAPSHOT", + version := "0.9.2", scalaVersion := "2.10.4", scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation", "-target:" + SCALAC_JVM_VERSION), diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 19b90dfd6e167..f6c96e3e9f538 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -84,7 +84,7 @@ class NaiveBayesModel(object): - pi: vector of logs of class priors (dimension C) - theta: matrix of logs of class conditional probabilities (CxD) - >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0, 1.0, 1.0]).reshape(3,3) >>> model = NaiveBayes.train(sc.parallelize(data)) >>> model.predict(array([0.0, 1.0])) 0 @@ -98,7 +98,7 @@ def __init__(self, pi, theta): def predict(self, x): """Return the most likely class for a data vector x""" - return numpy.argmax(self.pi + dot(x, self.theta)) + return numpy.argmax(self.pi + dot(x, self.theta.transpose())) class NaiveBayes(object): @classmethod diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 1145bcf95cb4d..c8c2803a8883d 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -35,7 +35,7 @@ ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /__ / .__/\_,_/_/ /_/\_\ version 0.9.2-candidate-csd-1-SNAPSHOT + /__ / .__/\_,_/_/ /_/\_\ version 0.9.2-csd-1-SNAPSHOT /_/ """ print "Using Python version %s (%s, %s)" % ( diff --git a/repl/pom.xml b/repl/pom.xml index 7c387c069e540..558f220648107 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 670ea749f35e0..36bc35937dbf5 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8a6051622e2d5..29d05e8996690 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -45,7 +45,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Files with mod time earlier than this is ignored. This is updated every interval // such that in the current interval, files older than any file found in the // previous interval will be ignored. Obviously this time keeps moving forward. - private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis() + private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L // Latest file mod time seen till any point of time @transient private var path_ : Path = null diff --git a/tools/pom.xml b/tools/pom.xml index 54ebb0426ef25..7bb2bc222ba7c 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-8-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml index a03da5f26bab6..e1967c8849ded 100644 --- a/yarn/alpha/pom.xml +++ b/yarn/alpha/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 0.9.1 + 0.9.2 ../pom.xml diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9e5e2d5ceaca1..9998210997bee 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -93,7 +93,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def validateArgs() = { @@ -109,7 +108,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -135,17 +134,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() - logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) + logInfo("Max mem capability of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("the worker size is to large to run on this cluster " + args.workerMemory) - System.exit(1) + val errorMessage = s"the worker size is too large to run on this cluster ${args.workerMemory}" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("AM size is to large to run on this cluster " + amMem) - System.exit(1) + val errorMessage = s"AM size is too large to run on this cluster $amMem" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -229,8 +230,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) @@ -475,9 +477,18 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + e.printStackTrace() + System.exit(1) + } + } + + System.exit(0) } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1419f215c78e5..6bbe87d68024d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -109,11 +109,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { case Nil => if (userJar == null || userClass == null) { - printUsageAndExit(1) + throw new IllegalArgumentException(getUsageMessage()) } case _ => - printUsageAndExit(1, args) + throw new IllegalArgumentException(getUsageMessage(args)) } } @@ -122,11 +122,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { } - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println( + def getUsageMessage(unknownParam: Any = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam \n" else "" + + message + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required)\n" + @@ -143,8 +142,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --files files Comma separated list of files to be distributed with the job.\n" + " --archives archives Comma separated list of archives to be distributed with the job." - ) - System.exit(exitCode) + } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 22e55e0c60647..ac786afe1ff22 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -73,10 +73,17 @@ private[spark] class YarnClientSchedulerBackend( .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) - val args = new ClientArguments(argsArrayBuf.toArray, conf) - client = new Client(args, conf) - appId = client.runApp() - waitForApp() + try { + val args = new ClientArguments(argsArrayBuf.toArray, conf) + client = new Client(args, conf) + appId = client.runApp() + waitForApp() + } catch { + case e: IllegalArgumentException => { + e.printStackTrace() + System.exit(1) + } + } } def waitForApp() { diff --git a/yarn/pom.xml b/yarn/pom.xml index 4e5b598709d80..b6a0ff58925db 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 0.9.2-candidate-csd-1-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml index 22694eccc4f09..70695f52a3b6e 100644 --- a/yarn/stable/pom.xml +++ b/yarn/stable/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 0.9.2-candidate-csd-1-SNAPSHOT + 0.9.2-csd-1-SNAPSHOT ../pom.xml diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6ff8c6c3b2497..1bfcaf657c94a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -113,7 +113,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } // TODO(harvey): This could just go in ClientArguments. @@ -130,7 +129,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -160,15 +159,18 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) // If we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.workerMemory, maxMem)) - System.exit(1) + val errorMessage = + "Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster." + .format(args.workerMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". - format(args.amMemory, maxMem)) - System.exit(1) + val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster" + .format(args.amMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -244,8 +246,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) @@ -489,9 +492,17 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() - val args = new ClientArguments(argStrings, sparkConf) + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + e.printStackTrace() + System.exit(1) + } + } - new Client(args, sparkConf).run() + System.exit(0) } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps