Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager(
// when the task backlog decreased.
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ private[deploy] object DeployMessages {
Utils.checkHostPort(hostPort)
}

// When the host of Worker is lost or decommissioned, the `workerHost` is the host address
// of that Worker. Otherwise, it's None.
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
exitStatus: Option[Int], workerLost: Boolean)
exitStatus: Option[Int], workerHost: Option[String])

case class ApplicationRemoved(message: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would still be okay with workerLost being an Option[String] instead of a Boolean. Obviously, had it been called "workerIsLost" then we would have to rename it. But I am also fine with the new name workerHost as well. I don't particularly think that the name workerLost must connote a boolean.

This ExecutorUpdated message is a case in point where the "lost" part is meaningful because it refers to the "worker that is lost" as opposed to some random worker-host.

But no strong feelings on this and I am happy with the choice workerHost.

val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId,
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
ExecutorDecommissionInfo(message.getOrElse(""), workerHost))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener {
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit

def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private[deploy] class Master(
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
Expand Down Expand Up @@ -909,9 +909,10 @@ private[deploy] class Master(
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None,
// workerLost is being set to true here to let the driver know that the host (aka. worker)
// is also being decommissioned.
workerLost = true))
// worker host is being set here to let the driver know that the host (aka. worker)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you reword the comment to be more accurate now :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated a little bit.

// is also being decommissioned. So the driver can unregister all the shuffle map
// statues located at this host when it receives the executor lost event.
Some(worker.host)))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
Expand All @@ -932,7 +933,7 @@ private[deploy] class Master(
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend(
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(
DecommissionExecutor(
executorId,
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
case _ =>
logError("No registered driver to send Decommission to.")
}
Expand Down Expand Up @@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg, false)))
executorId, ExecutorDecommissionInfo(msg)))
} else {
logError("No driver to message decommissioning.")
}
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1890,16 +1890,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
.doc("Duration for which a decommissioned executor's information will be kept after its" +
"removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
"decommissioning even after the mapper executor has been decommissioned. This allows " +
"eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5m")

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionState(bmAddress.executorId)
.exists(_.isHostDecommissioned)
.exists(_.workerHost.isDefined)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
Expand Down Expand Up @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
workerLost: Boolean): Unit = {
workerHost: Option[String]): Unit = {
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
hostToUnregisterOutputs = None,
hostToUnregisterOutputs = workerHost,
maybeEpoch = None)
}

Expand Down Expand Up @@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case ExecutorProcessLost(_, true, _) => true
case _ => false
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
case ExecutorDecommission(workerHost) => workerHost
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerLost)
dagScheduler.handleExecutorLost(execId, workerHost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.scheduler
/**
* Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
* being decommissioned too. Used to infer if the shuffle data might
* be lost even if the external shuffle service is enabled.
* @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker`
* in other places) has been decommissioned too. Used to infer if the
* shuffle data might be lost even if the external shuffle service is enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None)

/**
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
Expand All @@ -37,4 +37,4 @@ case class ExecutorDecommissionState(
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
// is configured.
startTime: Long,
isHostDecommissioned: Boolean)
workerHost: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los

/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
* @param workerHost it's defined when the host is confirmed lost too (i.e. including
* shuffle service)
* @param causedByApp whether the loss of the executor is the fault of the running app.
* (assumed true by default unless known explicitly otherwise)
*/
private[spark]
case class ExecutorProcessLost(
_message: String = "Executor Process Lost",
workerLost: Boolean = false,
workerHost: Option[String] = None,
causedByApp: Boolean = true)
extends ExecutorLossReason(_message)

Expand All @@ -69,5 +70,8 @@ case class ExecutorProcessLost(
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*
* @param workerHost it is defined when the worker is decommissioned too
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
extends ExecutorLossReason("Executor decommission.")
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will eventually exit.
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]

// When they exit and we know of that via heartbeat failure, we will add them to this cache.
// This cache is consulted to know if a fetch failure is because a source executor was
// decommissioned.
lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder()
.expireAfterWrite(
conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS)
.ticker(new Ticker{
override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
})
.build[String, ExecutorDecommissionState]()
.asMap()

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
Expand Down Expand Up @@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
val newDecomState = if (oldDecomStateOpt.isEmpty) {
// This is the first time we are hearing of decommissioning this executor,
// so create a brand new state.
ExecutorDecommissionState(
clock.getTimeMillis(),
decommissionInfo.isHostDecommissioned)
} else {
val oldDecomState = oldDecomStateOpt.get
if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) {
// Only the cluster manager is allowed to send decommission messages with
// isHostDecommissioned set. So the new decommissionInfo is from the cluster
// manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old
// decommission start time.
ExecutorDecommissionState(
oldDecomState.startTime,
isHostDecommissioned = true)
} else {
oldDecomState
}
}
executorsPendingDecommission(executorId) = newDecomState
executorsPendingDecommission(executorId) =
ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost)
}
}
rootPool.executorDecommission(executorId)
Expand All @@ -952,26 +920,11 @@ private[spark] class TaskSchedulerImpl(

override def getExecutorDecommissionState(executorId: String)
: Option[ExecutorDecommissionState] = synchronized {
executorsPendingDecommission
.get(executorId)
.orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
executorsPendingDecommission.get(executorId)
}

override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
val reason = givenReason match {
// Handle executor process loss due to decommissioning
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
val executorDecommissionState = getExecutorDecommissionState(executorId)
ExecutorProcessLost(
message,
// Also mark the worker lost if we know that the host was decommissioned
origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned),
// Executor loss is certainly not caused by app if we knew that this executor is being
// decommissioned
causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
case e => e
}

synchronized {
if (executorIdToRunningTaskIds.contains(executorId)) {
Expand Down Expand Up @@ -1060,9 +1013,7 @@ private[spark] class TaskSchedulerImpl(
}
}


val decomState = executorsPendingDecommission.remove(executorId)
decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _))
executorsPendingDecommission.remove(executorId)

if (reason != LossReasonPending) {
executorIdToHost -= executorId
Expand Down Expand Up @@ -1104,7 +1055,7 @@ private[spark] class TaskSchedulerImpl(
// exposed for test
protected final def isHostDecommissioned(host: String): Boolean = {
hostToExecutors.get(host).exists { executors =>
executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled => false
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorProcessLost(_, _, false) => false
case _ => true
}
Expand Down
Loading