Skip to content

Commit 092e2f1

Browse files
markhamstraandrewor14
authored andcommitted
SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors
Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master from removing an Application with RUNNING Executors. Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits over the entire lifetime of the Application, allow that many since any Executor successfully began running the Application; 2) Don't remove the Application while Master still thinks that there are RUNNING Executors. This should be fine as long as the ApplicationInfo doesn't believe any Executors are forever RUNNING when they are not. I think that any non-RUNNING Executors will eventually no longer be RUNNING in Master's accounting, but another set of eyes should confirm that. This PR also doesn't try to detect which nodes have gone rogue or to kill off bad Workers, so repeatedly failing Executors will continue to fail and fill up log files with failure reports as long as the Application keeps running. Author: Mark Hamstra <[email protected]> Closes #1360 from markhamstra/SPARK-2425 and squashes the following commits: f099c0b [Mark Hamstra] Reuse appInfo b2b7b25 [Mark Hamstra] Moved 'Application failed' logging bdd0928 [Mark Hamstra] switched to string interpolation 1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors
1 parent 2b7ab81 commit 092e2f1

File tree

4 files changed

+22
-12
lines changed

4 files changed

+22
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
9696

9797
def retryCount = _retryCount
9898

99-
def incrementRetryCount = {
99+
def incrementRetryCount() = {
100100
_retryCount += 1
101101
_retryCount
102102
}
103103

104+
def resetRetryCount() = _retryCount = 0
105+
104106
def markFinished(endState: ApplicationState.Value) {
105107
state = endState
106108
endTime = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,28 +296,34 @@ private[spark] class Master(
296296
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
297297
execOption match {
298298
case Some(exec) => {
299+
val appInfo = idToApp(appId)
299300
exec.state = state
301+
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
300302
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
301303
if (ExecutorState.isFinished(state)) {
302-
val appInfo = idToApp(appId)
303304
// Remove this executor from the worker and app
304-
logInfo("Removing executor " + exec.fullId + " because it is " + state)
305+
logInfo(s"Removing executor ${exec.fullId} because it is $state")
305306
appInfo.removeExecutor(exec)
306307
exec.worker.removeExecutor(exec)
307308

308-
val normalExit = exitStatus.exists(_ == 0)
309+
val normalExit = exitStatus == Some(0)
309310
// Only retry certain number of times so we don't go into an infinite loop.
310-
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
311-
schedule()
312-
} else if (!normalExit) {
313-
logError("Application %s with ID %s failed %d times, removing it".format(
314-
appInfo.desc.name, appInfo.id, appInfo.retryCount))
315-
removeApplication(appInfo, ApplicationState.FAILED)
311+
if (!normalExit) {
312+
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
313+
schedule()
314+
} else {
315+
val execs = appInfo.executors.values
316+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
317+
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
318+
s"${appInfo.retryCount} times; removing it")
319+
removeApplication(appInfo, ApplicationState.FAILED)
320+
}
321+
}
316322
}
317323
}
318324
}
319325
case None =>
320-
logWarning("Got status update for unknown executor " + appId + "/" + execId)
326+
logWarning(s"Got status update for unknown executor $appId/$execId")
321327
}
322328
}
323329

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
159159
Files.write(header, stderr, Charsets.UTF_8)
160160
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
161161

162+
state = ExecutorState.RUNNING
163+
worker ! ExecutorStateChanged(appId, execId, state, None, None)
162164
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
163165
// or with nonzero exit code
164166
val exitCode = process.waitFor()

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ private[spark] class Worker(
234234
try {
235235
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
236236
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
237-
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
237+
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
238238
executors(appId + "/" + execId) = manager
239239
manager.start()
240240
coresUsed += cores_

0 commit comments

Comments
 (0)