Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import org.apache.spark.scheduler.ExecutorDecommissionInfo

/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
Expand Down Expand Up @@ -81,6 +83,44 @@ private[spark] trait ExecutorAllocationClient {
countFailures: Boolean,
force: Boolean = false): Seq[String]

/**
* Request that the cluster manager decommission the specified executors.
* Default implementation delegates to kill, scheduler must override
* if it supports graceful decommissioning.
*
* @param executorsAndDecominfo identifiers of executors & decom info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {
killExecutors(executorsAndDecomInfo.map(_._1),
adjustTargetNumExecutors,
countFailures = false)
}


/**
* Request that the cluster manager decommission the specified executor.
* Delegates to decommissionExecutors.
*
* @param executorId identifiers of executor to decommission
* @param decommissionInfo information about the decommission (reason, host loss)
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
* @return whether the request is acknowledged by the cluster manager.
*/
final def decommissionExecutor(executorId: String,
decommissionInfo: ExecutorDecommissionInfo,
adjustTargetNumExecutors: Boolean): Boolean = {
val decommissionedExecutors = decommissionExecutors(
Array((executorId, decommissionInfo)),
adjustTargetNumExecutors = adjustTargetNumExecutors)
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}


/**
* Request that the cluster manager kill every executor on the specified host.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
Expand Down Expand Up @@ -127,6 +128,8 @@ private[spark] class ExecutorAllocationManager(
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)

private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)

private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id

validateSettings()
Expand Down Expand Up @@ -204,7 +207,12 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
// If dynamic allocation shuffle tracking or worker decommissioning along with
// storage shuffle decommissioning is enabled we have *experimental* support for
// decommissioning without a shuffle service.
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
(decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
Expand Down Expand Up @@ -539,7 +547,9 @@ private[spark] class ExecutorAllocationManager(
// get the running total as we remove or initialize it to the count - pendingRemoval
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
(executorMonitor.executorCountWithResourceProfile(rpId) -
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
executorMonitor.decommissioningPerResourceProfileId(rpId)
))
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
Expand All @@ -565,8 +575,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
}

// [SPARK-21834] killExecutors api reduces the target number of executors.
Expand All @@ -578,7 +594,11 @@ private[spark] class ExecutorAllocationManager(

// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
if (decommissionEnabled) {
executorMonitor.executorsDecommissioned(executorsRemoved)
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
executorsRemoved.toSeq
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
decommissionExecutor(executorId, decommissionInfo)
decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false)

case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
Expand Down Expand Up @@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
decommissionExecutor(executorId, decommissionInfo)
context.reply(true)
context.reply(decommissionExecutor(executorId, decommissionInfo,
adjustTargetNumExecutors = false))

case RetrieveSparkAppConfig(resourceProfileId) =>
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
Expand Down Expand Up @@ -419,59 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.workerRemoved(workerId, host, message)
}

/**
* Mark a given executor as decommissioned and stop making resource offers for it.
*/
private def decommissionExecutor(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
executorsPendingDecommission += executorId
true
} else {
false
}
}

if (shouldDisable) {
logInfo(s"Starting decommissioning executor $executorId.")
try {
scheduler.executorDecommission(executorId, decommissionInfo)
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
// Send decommission message to the executor, this may be a duplicate since the executor
// could have been the one to notify us. But it's also possible the notification came from
// elsewhere and the executor does not yet know.
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo("Starting decommissioning block manager corresponding to " +
s"executor $executorId.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
shouldDisable
}

/**
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
Expand Down Expand Up @@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected def minRegisteredRatio: Double = _minRegisteredRatio

/**
* Request that the cluster manager decommission the specified executors.
*
* @param executorsAndDecomInfo Identifiers of executors & decommission info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
override def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {

val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
CoarseGrainedSchedulerBackend.this.synchronized {
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
executorsPendingDecommission += executorId
true
} else {
false
}
}
}

// If we don't want to replace the executors we are decommissioning
if (adjustTargetNumExecutors) {
Copy link

@agrawaldevesh agrawaldevesh Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a check for executorsToDecommission.notEmpty ? Otherwise, we will request executors again with no change in the adjustExecutors helper method. Could again lead to some unnecessary strain on the driver.

Not a big deal because this is one time, since doDecommission isn't called again and again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me put that logic inside adjustExecutors :)

adjustExecutors(executorsToDecommission.map(_._1))
}

executorsToDecommission.filter { case (executorId, decomInfo) =>
doDecommission(executorId, decomInfo)
}.map(_._1)
}


private def doDecommission(executorId: String,
decomInfo: ExecutorDecommissionInfo): Boolean = {

logInfo(s"Asking executor $executorId to decommissioning.")
try {
scheduler.executorDecommission(executorId, decomInfo)
if (driverEndpoint != null) {
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
}
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
return false
}
// Send decommission message to the executor (it could have originated on the executor
// but not necessarily.
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
return false
}
}
logInfo(s"Asked executor $executorId to decommission.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
return false
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}

true
}


override def start(): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager()
Expand Down Expand Up @@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint.send(RemoveWorker(workerId, host, message))
}

/**
* Called by subclasses when notified of a decommissioning executor.
*/
private[spark] def decommissionExecutor(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
if (driverEndpoint != null) {
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
}
}

def sufficientResourcesRegistered(): Boolean = true

override def isReady(): Boolean = {
Expand Down Expand Up @@ -760,6 +777,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] =
Future.successful(false)

/**
* Adjust the number of executors being requested to no longer include the provided executors.
*/
private def adjustExecutors(executorIds: Seq[String]) = {
if (executorIds.nonEmpty) {
executorIds.foreach { exec =>
withLock {
val rpId = executorDataMap(exec).resourceProfileId
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
// Assume that we are killing an executor that was started by default and
// not through the request api
requestedTotalExecutorsPerResourceProfile(rp) = 0
} else {
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
}
}
}
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
} else {
Future.successful(true)
}
}

/**
* Request that the cluster manager kill the specified executors.
*
Expand Down Expand Up @@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (adjustTargetNumExecutors) {
executorsToKill.foreach { exec =>
val rpId = executorDataMap(exec).resourceProfileId
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
// Assume that we are killing an executor that was started by default and
// not through the request api
requestedTotalExecutorsPerResourceProfile(rp) = 0
} else {
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
}
}
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
adjustExecutors(executorsToKill)
} else {
Future.successful(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend(

override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
logInfo("Asked to decommission executor")
decommissionExecutor(fullId.split("/")(1), decommissionInfo)
val execId = fullId.split("/")(1)
decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false)
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
}

Expand Down
Loading