Skip to content

Commit 366a178

Browse files
dagrawal3409holdenk
authored andcommitted
[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning
### What changes were proposed in this pull request? This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework: - (a) Loss of decommissioned executors is not treated as a job failure: We know that the decommissioned executor would be dying soon, so its death is clearly not caused by the application. - (b) Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state ? Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wastage and too late would lead to fetch failures. The approach here is to do this clearing when the very first fetch failure is observed on the decommissioned block manager, without waiting for other blocks to also signal a failure. ### Why are the changes needed? Without them decommissioning a lot of executors at a time leads to job failures. ### Code overview The task scheduler tracks the executors that were decommissioned along with their `ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-) ### Questions for reviewers - Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-). Closes #29014 from agrawaldevesh/decom_harden. Authored-by: Devesh Agrawal <[email protected]> Signed-off-by: Holden Karau <[email protected]>
1 parent 7cf3b54 commit 366a178

File tree

9 files changed

+539
-7
lines changed

9 files changed

+539
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,10 +1821,19 @@ private[spark] class DAGScheduler(
18211821

18221822
// TODO: mark the executor as failed only if there were lots of fetch failures on it
18231823
if (bmAddress != null) {
1824-
val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
1825-
unRegisterOutputOnHostOnFetchFailure) {
1826-
// We had a fetch failure with the external shuffle service, so we
1827-
// assume all shuffle data on the node is bad.
1824+
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
1825+
val isHostDecommissioned = taskScheduler
1826+
.getExecutorDecommissionInfo(bmAddress.executorId)
1827+
.exists(_.isHostDecommissioned)
1828+
1829+
// Shuffle output of all executors on host `bmAddress.host` may be lost if:
1830+
// - External shuffle service is enabled, so we assume that all shuffle data on node is
1831+
// bad.
1832+
// - Host is decommissioned, thus all executors on that host will die.
1833+
val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled ||
1834+
isHostDecommissioned
1835+
val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost
1836+
&& unRegisterOutputOnHostOnFetchFailure) {
18281837
Some(bmAddress.host)
18291838
} else {
18301839
// Unregister shuffle data just for one executor (we don't have any
@@ -2339,7 +2348,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
23392348

23402349
case ExecutorLost(execId, reason) =>
23412350
val workerLost = reason match {
2342-
case ExecutorProcessLost(_, true) => true
2351+
case ExecutorProcessLost(_, true, _) => true
23432352
case _ => false
23442353
}
23452354
dagScheduler.handleExecutorLost(execId, workerLost)

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
5454
/**
5555
* @param _message human readable loss reason
5656
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
57+
* @param causedByApp whether the loss of the executor is the fault of the running app.
58+
* (assumed true by default unless known explicitly otherwise)
5759
*/
5860
private[spark]
59-
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
61+
case class ExecutorProcessLost(
62+
_message: String = "Executor Process Lost",
63+
workerLost: Boolean = false,
64+
causedByApp: Boolean = true)
6065
extends ExecutorLossReason(_message)
6166

6267
/**

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ private[spark] trait TaskScheduler {
103103
*/
104104
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
105105

106+
/**
107+
* If an executor is decommissioned, return its corresponding decommission info
108+
*/
109+
def getExecutorDecommissionInfo(executorId: String): Option[ExecutorDecommissionInfo]
110+
106111
/**
107112
* Process a lost executor
108113
*/

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ private[spark] class TaskSchedulerImpl(
136136
// IDs of the tasks running on each executor
137137
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
138138

139+
private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]
140+
139141
def runningTasksByExecutors: Map[String, Int] = synchronized {
140142
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
141143
}
@@ -939,12 +941,43 @@ private[spark] class TaskSchedulerImpl(
939941

940942
override def executorDecommission(
941943
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
944+
synchronized {
945+
// Don't bother noting decommissioning for executors that we don't know about
946+
if (executorIdToHost.contains(executorId)) {
947+
// The scheduler can get multiple decommission updates from multiple sources,
948+
// and some of those can have isHostDecommissioned false. We merge them such that
949+
// if we heard isHostDecommissioned ever true, then we keep that one since it is
950+
// most likely coming from the cluster manager and thus authoritative
951+
val oldDecomInfo = executorsPendingDecommission.get(executorId)
952+
if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) {
953+
executorsPendingDecommission(executorId) = decommissionInfo
954+
}
955+
}
956+
}
942957
rootPool.executorDecommission(executorId)
943958
backend.reviveOffers()
944959
}
945960

946-
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
961+
override def getExecutorDecommissionInfo(executorId: String)
962+
: Option[ExecutorDecommissionInfo] = synchronized {
963+
executorsPendingDecommission.get(executorId)
964+
}
965+
966+
override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
947967
var failedExecutor: Option[String] = None
968+
val reason = givenReason match {
969+
// Handle executor process loss due to decommissioning
970+
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
971+
val executorDecommissionInfo = getExecutorDecommissionInfo(executorId)
972+
ExecutorProcessLost(
973+
message,
974+
// Also mark the worker lost if we know that the host was decommissioned
975+
origWorkerLost || executorDecommissionInfo.exists(_.isHostDecommissioned),
976+
// Executor loss is certainly not caused by app if we knew that this executor is being
977+
// decommissioned
978+
causedByApp = executorDecommissionInfo.isEmpty && origCausedByApp)
979+
case e => e
980+
}
948981

949982
synchronized {
950983
if (executorIdToRunningTaskIds.contains(executorId)) {
@@ -1033,6 +1066,8 @@ private[spark] class TaskSchedulerImpl(
10331066
}
10341067
}
10351068

1069+
executorsPendingDecommission -= executorId
1070+
10361071
if (reason != LossReasonPending) {
10371072
executorIdToHost -= executorId
10381073
rootPool.executorLost(executorId, host, reason)

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,7 @@ private[spark] class TaskSetManager(
985985
val exitCausedByApp: Boolean = reason match {
986986
case exited: ExecutorExited => exited.exitCausedByApp
987987
case ExecutorKilled => false
988+
case ExecutorProcessLost(_, _, false) => false
988989
case _ => true
989990
}
990991
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,

0 commit comments

Comments
 (0)