-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl #29579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d5bc756
404f92b
47da0d7
3152e9b
a0bc4f6
75a14a6
b6490fc
bea465b
c12c82d
90f1fd1
84df735
0c0749e
6e8b57e
a39ba8e
ff02621
9096cb9
58add67
d246840
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( | |
cores)) | ||
listener.executorAdded(fullId, workerId, hostPort, cores, memory) | ||
|
||
case ExecutorUpdated(id, state, message, exitStatus, workerLost) => | ||
case ExecutorUpdated(id, state, message, exitStatus, workerHost) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I would still be okay with workerLost being an Option[String] instead of a Boolean. Obviously, had it been called "workerIsLost" then we would have to rename it. But I am also fine with the new name workerHost as well. I don't particularly think that the name workerLost must connote a boolean. This ExecutorUpdated message is a case in point where the "lost" part is meaningful because it refers to the "worker that is lost" as opposed to some random worker-host. But no strong feelings on this and I am happy with the choice workerHost. |
||
val fullId = appId + "/" + id | ||
val messageText = message.map(s => " (" + s + ")").getOrElse("") | ||
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) | ||
if (ExecutorState.isFinished(state)) { | ||
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) | ||
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) | ||
} else if (state == ExecutorState.DECOMMISSIONED) { | ||
listener.executorDecommissioned(fullId, | ||
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) | ||
ExecutorDecommissionInfo(message.getOrElse(""), workerHost)) | ||
} | ||
|
||
case WorkerRemoved(id, host, message) => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -308,7 +308,7 @@ private[deploy] class Master( | |
appInfo.resetRetryCount() | ||
} | ||
|
||
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) | ||
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None)) | ||
|
||
if (ExecutorState.isFinished(state)) { | ||
// Remove this executor from the worker and app | ||
|
@@ -909,9 +909,10 @@ private[deploy] class Master( | |
exec.application.driver.send(ExecutorUpdated( | ||
exec.id, ExecutorState.DECOMMISSIONED, | ||
Some("worker decommissioned"), None, | ||
// workerLost is being set to true here to let the driver know that the host (aka. worker) | ||
// is also being decommissioned. | ||
workerLost = true)) | ||
// worker host is being set here to let the driver know that the host (aka. worker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you reword the comment to be more accurate now :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated a little bit. |
||
// is also being decommissioned. So the driver can unregister all the shuffle map | ||
// statues located at this host when it receives the executor lost event. | ||
Some(worker.host))) | ||
exec.state = ExecutorState.DECOMMISSIONED | ||
exec.application.removeExecutor(exec) | ||
} | ||
|
@@ -932,7 +933,7 @@ private[deploy] class Master( | |
for (exec <- worker.executors.values) { | ||
logInfo("Telling app of lost executor: " + exec.id) | ||
exec.application.driver.send(ExecutorUpdated( | ||
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) | ||
exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host))) | ||
exec.state = ExecutorState.LOST | ||
exec.application.removeExecutor(exec) | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.