-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling #29367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1f28b07
bff1ef7
995ffa9
cc76ff5
6a69126
80629eb
e970cb1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
|
|
||
| case DecommissionExecutor(executorId, decommissionInfo) => | ||
| logError(s"Received decommission executor message ${executorId}: $decommissionInfo") | ||
| decommissionExecutor(executorId, decommissionInfo) | ||
| decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false) | ||
|
|
||
| case RemoveWorker(workerId, host, message) => | ||
| removeWorker(workerId, host, message) | ||
|
|
@@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
|
|
||
| case DecommissionExecutor(executorId, decommissionInfo) => | ||
| logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") | ||
| decommissionExecutor(executorId, decommissionInfo) | ||
| context.reply(true) | ||
| context.reply(decommissionExecutor(executorId, decommissionInfo, | ||
| adjustTargetNumExecutors = false)) | ||
|
|
||
| case RetrieveSparkAppConfig(resourceProfileId) => | ||
| val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) | ||
|
|
@@ -419,59 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| scheduler.workerRemoved(workerId, host, message) | ||
| } | ||
|
|
||
| /** | ||
| * Mark a given executor as decommissioned and stop making resource offers for it. | ||
| */ | ||
| private def decommissionExecutor( | ||
| executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { | ||
| val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { | ||
| // Only bother decommissioning executors which are alive. | ||
| if (isExecutorActive(executorId)) { | ||
| executorsPendingDecommission += executorId | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
|
|
||
| if (shouldDisable) { | ||
| logInfo(s"Starting decommissioning executor $executorId.") | ||
| try { | ||
| scheduler.executorDecommission(executorId, decommissionInfo) | ||
| } catch { | ||
| case e: Exception => | ||
| logError(s"Unexpected error during decommissioning ${e.toString}", e) | ||
| } | ||
| // Send decommission message to the executor, this may be a duplicate since the executor | ||
| // could have been the one to notify us. But it's also possible the notification came from | ||
| // elsewhere and the executor does not yet know. | ||
| executorDataMap.get(executorId) match { | ||
| case Some(executorInfo) => | ||
| executorInfo.executorEndpoint.send(DecommissionSelf) | ||
| case None => | ||
| // Ignoring the executor since it is not registered. | ||
| logWarning(s"Attempted to decommission unknown executor $executorId.") | ||
| } | ||
| logInfo(s"Finished decommissioning executor $executorId.") | ||
|
|
||
| if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
| try { | ||
| logInfo("Starting decommissioning block manager corresponding to " + | ||
| s"executor $executorId.") | ||
| scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) | ||
| } catch { | ||
| case e: Exception => | ||
| logError("Unexpected error during block manager " + | ||
| s"decommissioning for executor $executorId: ${e.toString}", e) | ||
| } | ||
| logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") | ||
| } | ||
| } else { | ||
| logInfo(s"Skipping decommissioning of executor $executorId.") | ||
| } | ||
| shouldDisable | ||
| } | ||
|
|
||
| /** | ||
| * Stop making resource offers for the given executor. The executor is marked as lost with | ||
| * the loss reason still pending. | ||
|
|
@@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
|
|
||
| protected def minRegisteredRatio: Double = _minRegisteredRatio | ||
|
|
||
| /** | ||
| * Request that the cluster manager decommission the specified executors. | ||
| * | ||
| * @param executorsAndDecomInfo Identifiers of executors & decommission info. | ||
| * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down | ||
| * after these executors have been decommissioned. | ||
| * @return the ids of the executors acknowledged by the cluster manager to be removed. | ||
| */ | ||
| override def decommissionExecutors( | ||
| executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], | ||
| adjustTargetNumExecutors: Boolean): Seq[String] = { | ||
|
|
||
| val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => | ||
| CoarseGrainedSchedulerBackend.this.synchronized { | ||
| // Only bother decommissioning executors which are alive. | ||
| if (isExecutorActive(executorId)) { | ||
| executorsPendingDecommission += executorId | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // If we don't want to replace the executors we are decommissioning | ||
| if (adjustTargetNumExecutors) { | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should there be a check for executorsToDecommission.notEmpty ? Otherwise, we will request executors again with no change in the adjustExecutors helper method. Could again lead to some unnecessary strain on the driver. Not a big deal because this is one time, since doDecommission isn't called again and again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me put that logic inside adjustExecutors :) |
||
| adjustExecutors(executorsToDecommission.map(_._1)) | ||
| } | ||
|
|
||
| executorsToDecommission.filter { case (executorId, decomInfo) => | ||
| doDecommission(executorId, decomInfo) | ||
| }.map(_._1) | ||
| } | ||
|
|
||
|
|
||
| private def doDecommission(executorId: String, | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| decomInfo: ExecutorDecommissionInfo): Boolean = { | ||
|
|
||
| logInfo(s"Asking executor $executorId to decommissioning.") | ||
| try { | ||
| scheduler.executorDecommission(executorId, decomInfo) | ||
| if (driverEndpoint != null) { | ||
| logInfo("Propagating executor decommission to driver.") | ||
| driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) | ||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logError(s"Unexpected error during decommissioning ${e.toString}", e) | ||
| return false | ||
| } | ||
| // Send decommission message to the executor (it could have originated on the executor | ||
| // but not necessarily. | ||
| CoarseGrainedSchedulerBackend.this.synchronized { | ||
| executorDataMap.get(executorId) match { | ||
| case Some(executorInfo) => | ||
| executorInfo.executorEndpoint.send(DecommissionSelf) | ||
| case None => | ||
| // Ignoring the executor since it is not registered. | ||
| logWarning(s"Attempted to decommission unknown executor $executorId.") | ||
| return false | ||
| } | ||
| } | ||
| logInfo(s"Asked executor $executorId to decommission.") | ||
|
|
||
| if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
| try { | ||
| logInfo(s"Asking block manager corresponding to executor $executorId to decommission.") | ||
| scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) | ||
| } catch { | ||
| case e: Exception => | ||
| logError("Unexpected error during block manager " + | ||
| s"decommissioning for executor $executorId: ${e.toString}", e) | ||
| return false | ||
| } | ||
| logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") | ||
| } | ||
|
|
||
| true | ||
| } | ||
|
|
||
|
|
||
| override def start(): Unit = { | ||
| if (UserGroupInformation.isSecurityEnabled()) { | ||
| delegationTokenManager = createTokenManager() | ||
|
|
@@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| driverEndpoint.send(RemoveWorker(workerId, host, message)) | ||
| } | ||
|
|
||
| /** | ||
| * Called by subclasses when notified of a decommissioning executor. | ||
| */ | ||
| private[spark] def decommissionExecutor( | ||
| executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { | ||
| if (driverEndpoint != null) { | ||
| logInfo("Propagating executor decommission to driver.") | ||
| driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) | ||
| } | ||
| } | ||
|
|
||
| def sufficientResourcesRegistered(): Boolean = true | ||
|
|
||
| override def isReady(): Boolean = { | ||
|
|
@@ -760,6 +777,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = | ||
| Future.successful(false) | ||
|
|
||
| /** | ||
| * Adjust the number of executors being requested to no longer include the provided executors. | ||
| */ | ||
| private def adjustExecutors(executorIds: Seq[String]) = { | ||
| if (executorIds.nonEmpty) { | ||
| executorIds.foreach { exec => | ||
| withLock { | ||
| val rpId = executorDataMap(exec).resourceProfileId | ||
| val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) | ||
| if (requestedTotalExecutorsPerResourceProfile.isEmpty) { | ||
| // Assume that we are killing an executor that was started by default and | ||
| // not through the request api | ||
| requestedTotalExecutorsPerResourceProfile(rp) = 0 | ||
| } else { | ||
| val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) | ||
| requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) | ||
| } | ||
| } | ||
| } | ||
| doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) | ||
| } else { | ||
| Future.successful(true) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Request that the cluster manager kill the specified executors. | ||
| * | ||
|
|
@@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| // take into account executors that are pending to be added or removed. | ||
| val adjustTotalExecutors = | ||
| if (adjustTargetNumExecutors) { | ||
| executorsToKill.foreach { exec => | ||
| val rpId = executorDataMap(exec).resourceProfileId | ||
| val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) | ||
| if (requestedTotalExecutorsPerResourceProfile.isEmpty) { | ||
| // Assume that we are killing an executor that was started by default and | ||
| // not through the request api | ||
| requestedTotalExecutorsPerResourceProfile(rp) = 0 | ||
| } else { | ||
| val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) | ||
| requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) | ||
| } | ||
| } | ||
| doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) | ||
| adjustExecutors(executorsToKill) | ||
| } else { | ||
| Future.successful(true) | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.