Skip to content

Commit 3fa3313

Browse files
committed
Connect decommissioning to dynamic scaling
Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads Make Spark's dynamic allocation use decommissioning Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. Verify executors decommissioned, then killed by external external cluster manager are re-launched Verify some additional calls are not occuring in the executor allocation manager suite. Dont' close the watcher until the end of the test Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors bump numparts up to 6 Revert "bump numparts up to 6" This reverts commit daf96dd. Small coment & visibility cleanup CR feedback/cleanup
1 parent 75c2c53 commit 3fa3313

File tree

16 files changed

+352
-58
lines changed

16 files changed

+352
-58
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark
1919

20+
import org.apache.spark.scheduler.ExecutorDecommissionInfo
2021
/**
2122
* A client that communicates with the cluster manager to request or kill executors.
2223
* This is currently supported only in YARN mode.
@@ -81,6 +82,45 @@ private[spark] trait ExecutorAllocationClient {
8182
countFailures: Boolean,
8283
force: Boolean = false): Seq[String]
8384

85+
/**
86+
* Request that the cluster manager decommission the specified executors.
87+
* Default implementation delegates to kill, scheduler must override
88+
* if it supports graceful decommissioning.
89+
*
90+
* @param executorsAndDecominfo identifiers of executors & decom info.
91+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
92+
* after these executors have been decommissioned.
93+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
94+
*/
95+
def decommissionExecutors(
96+
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
97+
adjustTargetNumExecutors: Boolean): Seq[String] = {
98+
killExecutors(executorsAndDecomInfo.map(_._1),
99+
adjustTargetNumExecutors,
100+
countFailures = false)
101+
}
102+
103+
104+
/**
105+
* Request that the cluster manager decommission the specified executor.
106+
* Default implementation delegates to decommissionExecutors, scheduler can override
107+
* if it supports graceful decommissioning.
108+
*
109+
* @param executorId identifiers of executor to decommission
110+
* @param decommissionInfo information about the decommission (reason, host loss)
111+
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
112+
* @return whether the request is acknowledged by the cluster manager.
113+
*/
114+
def decommissionExecutor(executorId: String,
115+
decommissionInfo: ExecutorDecommissionInfo,
116+
adjustTargetNumExecutors: Boolean): Boolean = {
117+
val decommissionedExecutors = decommissionExecutors(
118+
Seq((executorId, decommissionInfo)),
119+
adjustTargetNumExecutors = adjustTargetNumExecutors)
120+
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
121+
}
122+
123+
84124
/**
85125
* Request that the cluster manager kill every executor on the specified host.
86126
*

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
31+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
3132
import org.apache.spark.metrics.source.Source
3233
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3334
import org.apache.spark.resource.ResourceProfileManager
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
204205
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
205206
}
206207
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
207-
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
208+
// If dynamic allocation shuffle tracking or worker decommissioning along with
209+
// storage shuffle decommissioning is enabled we have *experimental* support for
210+
// decommissioning without a shuffle service.
211+
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
212+
(conf.get(WORKER_DECOMMISSION_ENABLED) &&
213+
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
208214
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
209215
} else if (!testing) {
210216
throw new SparkException("Dynamic allocation of executors requires the external " +
@@ -539,7 +545,9 @@ private[spark] class ExecutorAllocationManager(
539545
// get the running total as we remove or initialize it to the count - pendingRemoval
540546
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
541547
(executorMonitor.executorCountWithResourceProfile(rpId) -
542-
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
548+
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
549+
executorMonitor.decommissioningPerResourceProfileId(rpId)
550+
))
543551
if (newExecutorTotal - 1 < minNumExecutors) {
544552
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
545553
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
565573
} else {
566574
// We don't want to change our target number of executors, because we already did that
567575
// when the task backlog decreased.
568-
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
569-
countFailures = false, force = false)
576+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
577+
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
578+
id => (id, ExecutorDecommissionInfo("spark scale down", false)))
579+
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
580+
} else {
581+
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
582+
countFailures = false, force = false)
583+
}
570584
}
571585

572586
// [SPARK-21834] killExecutors api reduces the target number of executors.
@@ -578,7 +592,11 @@ private[spark] class ExecutorAllocationManager(
578592

579593
// reset the newExecutorTotal to the existing number of executors
580594
if (testing || executorsRemoved.nonEmpty) {
581-
executorMonitor.executorsKilled(executorsRemoved.toSeq)
595+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
596+
executorMonitor.executorsDecommissioned(executorsRemoved)
597+
} else {
598+
executorMonitor.executorsKilled(executorsRemoved.toSeq)
599+
}
582600
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
583601
executorsRemoved.toSeq
584602
} else {

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
421421

422422
/**
423423
* Mark a given executor as decommissioned and stop making resource offers for it.
424+
*
424425
*/
425426
private def decommissionExecutor(
426427
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
503504

504505
protected def minRegisteredRatio: Double = _minRegisteredRatio
505506

507+
/**
508+
* Request that the cluster manager decommission the specified executors.
509+
*
510+
* @param executorsAndDecomInfo Identifiers of executors & decommission info.
511+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
512+
* after these executors have been decommissioned.
513+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
514+
*/
515+
override def decommissionExecutors(
516+
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
517+
adjustTargetNumExecutors: Boolean): Seq[String] = {
518+
519+
val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
520+
CoarseGrainedSchedulerBackend.this.synchronized {
521+
// Only bother decommissioning executors which are alive.
522+
if (isExecutorActive(executorId)) {
523+
executorsPendingDecommission += executorId
524+
true
525+
} else {
526+
false
527+
}
528+
}
529+
}
530+
531+
// If we don't want to replace the executors we are decommissioning
532+
if (adjustTargetNumExecutors) {
533+
executorsToDecommission.foreach { case (exec, _) =>
534+
val rpId = withLock {
535+
executorDataMap(exec).resourceProfileId
536+
}
537+
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
538+
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
539+
// Assume that we are killing an executor that was started by default and
540+
// not through the request api
541+
requestedTotalExecutorsPerResourceProfile(rp) = 0
542+
} else {
543+
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
544+
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
545+
}
546+
}
547+
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
548+
}
549+
550+
val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) =>
551+
doDecommission(executorId, decomInfo)
552+
}.map(_._1)
553+
decommissioned
554+
}
555+
556+
557+
private def doDecommission(executorId: String,
558+
decomInfo: ExecutorDecommissionInfo): Boolean = {
559+
560+
logInfo(s"Asking executor $executorId to decommissioning.")
561+
try {
562+
scheduler.executorDecommission(executorId, decomInfo)
563+
if (driverEndpoint != null) {
564+
logInfo("Propagating executor decommission to driver.")
565+
driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
566+
}
567+
} catch {
568+
case e: Exception =>
569+
logError(s"Unexpected error during decommissioning ${e.toString}", e)
570+
return false
571+
}
572+
// Send decommission message to the executor (it could have originated on the executor
573+
// but not necessarily.
574+
CoarseGrainedSchedulerBackend.this.synchronized {
575+
executorDataMap.get(executorId) match {
576+
case Some(executorInfo) =>
577+
executorInfo.executorEndpoint.send(DecommissionSelf)
578+
case None =>
579+
// Ignoring the executor since it is not registered.
580+
logWarning(s"Attempted to decommission unknown executor $executorId.")
581+
return false
582+
}
583+
}
584+
logInfo(s"Asked executor $executorId to decommission.")
585+
586+
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
587+
try {
588+
logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
589+
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
590+
} catch {
591+
case e: Exception =>
592+
logError("Unexpected error during block manager " +
593+
s"decommissioning for executor $executorId: ${e.toString}", e)
594+
return false
595+
}
596+
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
597+
}
598+
599+
true
600+
}
601+
602+
506603
override def start(): Unit = {
507604
if (UserGroupInformation.isSecurityEnabled()) {
508605
delegationTokenManager = createTokenManager()
@@ -598,17 +695,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
598695
driverEndpoint.send(RemoveWorker(workerId, host, message))
599696
}
600697

601-
/**
602-
* Called by subclasses when notified of a decommissioning executor.
603-
*/
604-
private[spark] def decommissionExecutor(
605-
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
606-
if (driverEndpoint != null) {
607-
logInfo("Propagating executor decommission to driver.")
608-
driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
609-
}
610-
}
611-
612698
def sufficientResourcesRegistered(): Boolean = true
613699

614700
override def isReady(): Boolean = {

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend(
176176

177177
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
178178
logInfo("Asked to decommission executor")
179-
decommissionExecutor(fullId.split("/")(1), decommissionInfo)
179+
val execId = fullId.split("/")(1)
180+
decommissionExecutors(Seq((execId, decommissionInfo)), adjustTargetNumExecutors = false)
180181
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
181182
}
182183

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config._
2929
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3030
import org.apache.spark.scheduler._
31-
import org.apache.spark.storage.RDDBlockId
31+
import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId}
3232
import org.apache.spark.util.Clock
3333

3434
/**
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
114114

115115
var newNextTimeout = Long.MaxValue
116116
timedOutExecs = executors.asScala
117-
.filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
117+
.filter { case (_, exec) =>
118+
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning}
118119
.filter { case (_, exec) =>
119120
val deadline = exec.timeoutAt
120121
if (deadline > now) {
@@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor(
135136

136137
/**
137138
* Mark the given executors as pending to be removed. Should only be called in the EAM thread.
139+
* This covers both kills and decommissions.
138140
*/
139141
def executorsKilled(ids: Seq[String]): Unit = {
140142
ids.foreach { id =>
@@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor(
149151
nextTimeout.set(Long.MinValue)
150152
}
151153

154+
private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {
155+
ids.foreach { id =>
156+
val tracker = executors.get(id)
157+
if (tracker != null) {
158+
tracker.decommissioning = true
159+
}
160+
}
161+
162+
// Recompute timed out executors in the next EAM callback, since this call invalidates
163+
// the current list.
164+
nextTimeout.set(Long.MinValue)
165+
}
166+
152167
def executorCount: Int = executors.size()
153168

154169
def executorCountWithResourceProfile(id: Int): Int = {
@@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor(
171186
executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
172187
}
173188

189+
def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
190+
exec.decommissioning
191+
}
192+
193+
def decommissioningPerResourceProfileId(id: Int): Int = {
194+
executors.asScala.filter { case (k, v) =>
195+
v.resourceProfileId == id && v.decommissioning
196+
}.size
197+
}
198+
174199
override def onJobStart(event: SparkListenerJobStart): Unit = {
175200
if (!shuffleTrackingEnabled) {
176201
return
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
298323
//
299324
// This means that an executor may be marked as having shuffle data, and thus prevented
300325
// from being removed, even though the data may not be used.
326+
// TODO: Only track used files (SPARK-31974)
301327
if (shuffleTrackingEnabled && event.reason == Success) {
302328
stageToShuffleID.get(event.stageId).foreach { shuffleId =>
303329
exec.addShuffle(shuffleId)
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
326352
val removed = executors.remove(event.executorId)
327353
if (removed != null) {
328354
decrementExecResourceProfileCount(removed.resourceProfileId)
329-
if (!removed.pendingRemoval) {
355+
if (!removed.pendingRemoval || !removed.decommissioning) {
330356
nextTimeout.set(Long.MinValue)
331357
}
332358
}
333359
}
334360

335361
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
336-
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
337-
return
338-
}
339362
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
340363
UNKNOWN_RESOURCE_PROFILE_ID)
364+
365+
// Check if it is a shuffle file, or RDD to pick the correct codepath for update
366+
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) {
367+
/**
368+
* The executor monitor keeps track of locations of cache and shuffle blocks and this can be
369+
* used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks
370+
* around now this wires it up so that it keeps track of it. We only do this for data blocks
371+
* as index and other blocks blocks do not necessarily mean the entire block has been
372+
* committed.
373+
*/
374+
event.blockUpdatedInfo.blockId match {
375+
case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
376+
case _ => // For now we only update on data blocks
377+
}
378+
return
379+
} else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
380+
return
381+
}
341382
val storageLevel = event.blockUpdatedInfo.storageLevel
342383
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
343384

@@ -410,10 +451,15 @@ private[spark] class ExecutorMonitor(
410451
}
411452

412453
// Visible for testing
413-
def executorsPendingToRemove(): Set[String] = {
454+
private[spark] def executorsPendingToRemove(): Set[String] = {
414455
executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet
415456
}
416457

458+
// Visible for testing
459+
private[spark] def executorsDecommissioning(): Set[String] = {
460+
executors.asScala.filter { case (_, exec) => exec.decommissioning }.keys.toSet
461+
}
462+
417463
/**
418464
* This method should be used when updating executor state. It guards against a race condition in
419465
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
@@ -466,6 +512,7 @@ private[spark] class ExecutorMonitor(
466512
@volatile var timedOut: Boolean = false
467513

468514
var pendingRemoval: Boolean = false
515+
var decommissioning: Boolean = false
469516
var hasActiveShuffle: Boolean = false
470517

471518
private var idleStart: Long = -1

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1822,7 +1822,7 @@ private[spark] class BlockManager(
18221822
}
18231823
}
18241824

1825-
/*
1825+
/**
18261826
* Returns the last migration time and a boolean denoting if all the blocks have been migrated.
18271827
* If there are any tasks running since that time the boolean may be incorrect.
18281828
*/

0 commit comments

Comments
 (0)