From 1dd591b37b61ab71785b68b31c373bc5be6bdc54 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 9 Jul 2014 16:02:43 -0700 Subject: [PATCH 1/4] SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors --- .../spark/deploy/master/ApplicationInfo.scala | 4 +++- .../apache/spark/deploy/master/Master.scala | 22 ++++++++++++------- .../spark/deploy/worker/ExecutorRunner.scala | 2 ++ .../apache/spark/deploy/worker/Worker.scala | 2 +- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index d3674427b1271..c3ca43f8d0734 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -96,11 +96,13 @@ private[spark] class ApplicationInfo( def retryCount = _retryCount - def incrementRetryCount = { + def incrementRetryCount() = { _retryCount += 1 _retryCount } + def resetRetryCount() = _retryCount = 0 + def markFinished(endState: ApplicationState.Value) { state = endState endTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2a66fcfe4801c..e1fc6af158a68 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -296,23 +296,29 @@ private[spark] class Master( val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { + val appInfo = idToApp(appId) exec.state = state + if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { - val appInfo = idToApp(appId) // Remove this executor from the worker and app logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) - val normalExit = exitStatus.exists(_ == 0) + val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. - if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { - schedule() - } else if (!normalExit) { - logError("Application %s with ID %s failed %d times, removing it".format( - appInfo.desc.name, appInfo.id, appInfo.retryCount)) - removeApplication(appInfo, ApplicationState.FAILED) + if (!normalExit) { + if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { + schedule() + } else { + logError("Application %s with ID %s failed %d times, removing it".format( + appInfo.desc.name, appInfo.id, appInfo.retryCount)) + val execs = idToApp(appId).executors.values + if (!execs.exists(_.state == ExecutorState.RUNNING)) { + removeApplication(appInfo, ApplicationState.FAILED) + } + } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7be89f9aff0f3..00a43673e5cd3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -159,6 +159,8 @@ private[spark] class ExecutorRunner( Files.write(header, stderr, Charsets.UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) + state = ExecutorState.RUNNING + worker ! ExecutorStateChanged(appId, execId, state, None, None) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code val exitCode = process.waitFor() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index e475567db6a20..0c454e4138c96 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -234,7 +234,7 @@ private[spark] class Worker( try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) + self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ From bdd0928ea9011d5ccb890881d9c8426dcd8ea1ae Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 11 Jul 2014 10:28:59 -0700 Subject: [PATCH 2/4] switched to string interpolation --- .../scala/org/apache/spark/deploy/master/Master.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e1fc6af158a68..9cbedb8f03193 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -302,7 +302,7 @@ private[spark] class Master( exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app - logInfo("Removing executor " + exec.fullId + " because it is " + state) + logInfo(s"Removing executor ${exec.fullId} because it is $state") appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) @@ -312,8 +312,8 @@ private[spark] class Master( if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { schedule() } else { - logError("Application %s with ID %s failed %d times, removing it".format( - appInfo.desc.name, appInfo.id, appInfo.retryCount)) + logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + + s"${appInfo.retryCount} times; removing it") val execs = idToApp(appId).executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { removeApplication(appInfo, ApplicationState.FAILED) @@ -323,7 +323,7 @@ private[spark] class Master( } } case None => - logWarning("Got status update for unknown executor " + appId + "/" + execId) + logWarning(s"Got status update for unknown executor $appId/$execId") } } From b2b7b25af919ff6e991fc2132923c3ddcf44c233 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 12 Aug 2014 12:08:03 -0700 Subject: [PATCH 3/4] Moved 'Application failed' logging --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9cbedb8f03193..514886a34ab98 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -312,10 +312,10 @@ private[spark] class Master( if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { schedule() } else { - logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + - s"${appInfo.retryCount} times; removing it") val execs = idToApp(appId).executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { + logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } From f099c0b2654759ab5cbfe2bc91cedac10f3bf77f Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 8 Sep 2014 11:07:12 -0700 Subject: [PATCH 4/4] Reuse appInfo --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 514886a34ab98..a3909d6ea95c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -312,7 +312,7 @@ private[spark] class Master( if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { schedule() } else { - val execs = idToApp(appId).executors.values + val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it")