From 1f28b07c868698938b69b9b54ed135ee484858d6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 11:44:28 -0700 Subject: [PATCH 1/7] Connect decommissioning to dynamic scaling Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads Make Spark's dynamic allocation use decommissioning Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. Verify executors decommissioned, then killed by external external cluster manager are re-launched Verify some additional calls are not occuring in the executor allocation manager suite. Dont' close the watcher until the end of the test Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors bump numparts up to 6 Revert "bump numparts up to 6" This reverts commit daf96ddae8d1cc2440b8b5452a1cd7c3e499f278. Small coment & visibility cleanup CR feedback/cleanup --- .../spark/ExecutorAllocationClient.scala | 40 +++++++ .../spark/ExecutorAllocationManager.scala | 28 ++++- .../CoarseGrainedSchedulerBackend.scala | 108 ++++++++++++++++-- .../cluster/StandaloneSchedulerBackend.scala | 3 +- .../scheduler/dynalloc/ExecutorMonitor.scala | 61 ++++++++-- .../apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockManagerMaster.scala | 6 +- .../ExecutorAllocationManagerSuite.scala | 71 +++++++++++- .../WorkerDecommissionExtendedSuite.scala | 3 +- .../scheduler/WorkerDecommissionSuite.scala | 4 +- ...kManagerDecommissionIntegrationSuite.scala | 5 +- .../src/main/dockerfiles/spark/decom.sh | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 27 ++--- .../tests/decommissioning.py | 5 - .../scheduler/ExecutorAllocationManager.scala | 10 +- .../ExecutorAllocationManagerSuite.scala | 35 +++++- 16 files changed, 352 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 00bd0063c9e3a..85022c4a3bc1d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,6 +17,7 @@ package org.apache.spark +import org.apache.spark.scheduler.ExecutorDecommissionInfo /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. @@ -81,6 +82,45 @@ private[spark] trait ExecutorAllocationClient { countFailures: Boolean, force: Boolean = false): Seq[String] + /** + * Request that the cluster manager decommission the specified executors. + * Default implementation delegates to kill, scheduler must override + * if it supports graceful decommissioning. + * + * @param executorsAndDecominfo identifiers of executors & decom info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + def decommissionExecutors( + executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executorsAndDecomInfo.map(_._1), + adjustTargetNumExecutors, + countFailures = false) + } + + + /** + * Request that the cluster manager decommission the specified executor. + * Default implementation delegates to decommissionExecutors, scheduler can override + * if it supports graceful decommissioning. + * + * @param executorId identifiers of executor to decommission + * @param decommissionInfo information about the decommission (reason, host loss) + * @param adjustTargetNumExecutors if we should adjust the target number of executors. + * @return whether the request is acknowledged by the cluster manager. + */ + def decommissionExecutor(executorId: String, + decommissionInfo: ExecutorDecommissionInfo, + adjustTargetNumExecutors: Boolean): Boolean = { + val decommissionedExecutors = decommissionExecutors( + Seq((executorId, decommissionInfo)), + adjustTargetNumExecutors = adjustTargetNumExecutors) + decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) + } + + /** * Request that the cluster manager kill every executor on the specified host. * diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1570f869c5bd0..ae2d2b541f944 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { - if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) { + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (conf.get(WORKER_DECOMMISSION_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + @@ -539,7 +545,9 @@ private[spark] class ExecutorAllocationManager( // get the running total as we remove or initialize it to the count - pendingRemoval val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId, (executorMonitor.executorCountWithResourceProfile(rpId) - - executorMonitor.pendingRemovalCountPerResourceProfileId(rpId))) + executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) - + executorMonitor.decommissioningPerResourceProfileId(rpId) + )) if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " + s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " + @@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, - countFailures = false, force = false) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( + id => (id, ExecutorDecommissionInfo("spark scale down", false))) + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) + } else { + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, + countFailures = false, force = false) + } } // [SPARK-21834] killExecutors api reduces the target number of executors. @@ -578,7 +592,11 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved.toSeq) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + executorMonitor.executorsDecommissioned(executorsRemoved) + } else { + executorMonitor.executorsKilled(executorsRemoved.toSeq) + } logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") executorsRemoved.toSeq } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 200f2d87a8a7a..f2959c39a3fcb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -421,6 +421,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Mark a given executor as decommissioned and stop making resource offers for it. + * */ private def decommissionExecutor( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { @@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + executorsToDecommission.foreach { case (exec, _) => + val rpId = withLock { + executorDataMap(exec).resourceProfileId + } + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + + val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + decommissioned + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Asking executor $executorId to decommissioning.") + try { + scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) + } + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + return false + } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } + } + logInfo(s"Asked executor $executorId to decommission.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo(s"Asking block manager corresponding to executor $executorId to decommission.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + + true + } + + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() @@ -598,17 +695,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(RemoveWorker(workerId, host, message)) } - /** - * Called by subclasses when notified of a decommissioning executor. - */ - private[spark] def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) - } - } - def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d921af602b254..660b3cf4bd8b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1), decommissionInfo) + val execId = fullId.split("/")(1) + decommissionExecutors(Seq((execId, decommissionInfo)), adjustTargetNumExecutors = false) logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4d7190726f067..fe2efd92817e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ -import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId} import org.apache.spark.util.Clock /** @@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } + .filter { case (_, exec) => + !exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning} .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor( /** * Mark the given executors as pending to be removed. Should only be called in the EAM thread. + * This covers both kills and decommissions. */ def executorsKilled(ids: Seq[String]): Unit = { ids.foreach { id => @@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor( nextTimeout.set(Long.MinValue) } + private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = { + ids.foreach { id => + val tracker = executors.get(id) + if (tracker != null) { + tracker.decommissioning = true + } + } + + // Recompute timed out executors in the next EAM callback, since this call invalidates + // the current list. + nextTimeout.set(Long.MinValue) + } + def executorCount: Int = executors.size() def executorCountWithResourceProfile(id: Int): Int = { @@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size } + def decommissioningCount: Int = executors.asScala.count { case (_, exec) => + exec.decommissioning + } + + def decommissioningPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => + v.resourceProfileId == id && v.decommissioning + }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) @@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { - return - } val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can be + * used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks + * around now this wires it up so that it keeps track of it. We only do this for data blocks + * as index and other blocks blocks do not necessarily mean the entire block has been + * committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + return + } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + return + } val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] @@ -410,10 +451,15 @@ private[spark] class ExecutorMonitor( } // Visible for testing - def executorsPendingToRemove(): Set[String] = { + private[spark] def executorsPendingToRemove(): Set[String] = { executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet } + // Visible for testing + private[spark] def executorsDecommissioning(): Set[String] = { + executors.asScala.filter { case (_, exec) => exec.decommissioning }.keys.toSet + } + /** * This method should be used when updating executor state. It guards against a race condition in * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` @@ -466,6 +512,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var decommissioning: Boolean = false var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6ec93df67f7db..ee534f549fd03 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,7 +1822,7 @@ private[spark] class BlockManager( } } - /* + /** * Returns the last migration time and a boolean denoting if all the blocks have been migrated. * If there are any tasks running since that time the boolean may be incorrect. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 93492cc6d7db6..f544d47b8e13c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,9 +43,11 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ + /** Decommission block managers corresponding to given set of executors + * Non-blocking. + */ def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds)) } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5b367d2fb01d4..3abe051e47086 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) } + test("mock polling loop remove with decommissioning") { + val clock = new ManualClock(2020L) + val manager = createManager(createConf(1, 20, 1, true), clock = clock) + + // Remove idle executors on timeout + onExecutorAddedDefaultProfile(manager, "executor-1") + onExecutorAddedDefaultProfile(manager, "executor-2") + onExecutorAddedDefaultProfile(manager, "executor-3") + assert(executorsDecommissioning(manager).isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + + // idle threshold not reached yet + clock.advance(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(manager.executorMonitor.timedOutExecutors().isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorsDecommissioning(manager).isEmpty) + + // idle threshold exceeded + clock.advance(executorIdleTimeout * 1000) + assert(manager.executorMonitor.timedOutExecutors().size === 3) + schedule(manager) + assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining) + assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAddedDefaultProfile(manager, "executor-4") + onExecutorAddedDefaultProfile(manager, "executor-5") + onExecutorAddedDefaultProfile(manager, "executor-6") + onExecutorAddedDefaultProfile(manager, "executor-7") + assert(manager.executorMonitor.executorCount === 7) + assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed + assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + + // after scheduling, the previously timed out executor should be removed, since + // there are new active ones. + schedule(manager) + assert(executorsDecommissioning(manager).size === 3) + + // advance the clock so that idle executors should time out and move to the pending list + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsPendingToRemove(manager).size === 0) + assert(executorsDecommissioning(manager).size === 4) + assert(!executorsDecommissioning(manager).contains("executor-4")) + assert(!executorsDecommissioning(manager).contains("executor-5")) + assert(!executorsDecommissioning(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(executorsDecommissioning(manager).size === 4) + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining) + } + test("listeners trigger add executors correctly") { val manager = createManager(createConf(1, 20, 1)) assert(addTime(manager) === NOT_SET) @@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, - initialExecutors: Int = 1): SparkConf = { + initialExecutors: Int = 1, + decommissioningEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 30000L) + .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } @@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = { manager.executorMonitor.executorsPendingToRemove() } + + private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = { + manager.executorMonitor.executorsDecommissioning() + } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index d95deb1f5f327..6bfd3f72e6322 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false)) + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false), + adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index bb0c33acc0af5..fe16da77b4727 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) + // Make the executors decommission, finish, exit, and not be replaced. + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))) + sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 7cf008381af66..1951304ef15dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS logInfo(s"Decommissioning executor ${execToDecommission}") sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false)) + ExecutorDecommissionInfo("", isHostDecommissioned = false), + adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() // Wait for job to finish. @@ -276,6 +277,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Wait for the executor to be removed automatically after migration. + // This is set to a high value since github actions is sometimes high latency + // but I've never seen this go for more than a minute. assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 8a5208d49a70f..cd973df257f07 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -sleep 30 +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 279386d94b35d..28ab37152cf4c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -279,6 +279,7 @@ class KubernetesSuite extends SparkFunSuite appArgs = appArgs) val execPods = scala.collection.mutable.Map[String, Pod]() + val podsDeleted = scala.collection.mutable.HashSet[String]() val (patienceInterval, patienceTimeout) = { executorPatience match { case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) @@ -339,27 +340,21 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - val pod = kubernetesTestComponents.kubernetesClient.pods() - .withName(name) - pod.delete() + kubernetesTestComponents.kubernetesClient.pods() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") - // Look for the string that indicates we should force kill the first - // Executor. This simulates the pod being fully lost. - logDebug("Waiting for second collect...") + // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("Waiting some more, please kill exec 1."), - "Decommission test did not complete second collect.") + assert(podsDeleted.contains(name)) + } + // Then make sure this pod is replaced + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(execPods.size == 3) } - logDebug("Force deleting") - val podNoGrace = pod.withGracePeriod(0) - podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) + podsDeleted += name } } }) @@ -388,7 +383,6 @@ class KubernetesSuite extends SparkFunSuite Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } - execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => @@ -400,6 +394,7 @@ class KubernetesSuite extends SparkFunSuite s"The application did not complete, did not find str ${e}") } } + execWatcher.close() } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d34e61611461c..5fcad083b007c 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -47,11 +47,6 @@ def addToAcc(x): print("...") time.sleep(30) rdd.count() - print("Waiting some more, please kill exec 1.") - print("...") - time.sleep(30) - print("Executor node should be deleted now") - rdd.count() rdd.collect() print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 58bd56c591d04..a4b7b7a28fd46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -23,7 +23,9 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false), + adjustTargetNumExecutors = true) + } else { + client.killExecutor(execIdToRemove) + } logInfo(s"Requested to kill executor $execIdToRemove") } else { logInfo(s"No non-receiver executors to kill") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 65efa10bfcf92..a5d52506b1268 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } test("basic functionality") { + basicTest(decommissioning = false) + } + + test("basic decommissioning") { + basicTest(decommissioning = true) + } + + def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly + conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it - withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) => + withAllocationManager(numReceivers = 1, conf = conf) { + case (receiverTracker, allocationManager) => + when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1"))) /** Add data point for batch processing time and verify executor allocation */ @@ -83,12 +96,26 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ + /** Verify that a particular executor was scaled down. */ def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { if (expectedKilledExec.nonEmpty) { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + val decomInfo = ExecutorDecommissionInfo("spark scale down", false) + if (decommissioning) { + verify(allocationClient, times(1)).decommissionExecutor( + meq(expectedKilledExec.get), meq(decomInfo), meq(true)) + verify(allocationClient, never).killExecutor(meq(expectedKilledExec.get)) + } else { + verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + verify(allocationClient, never).decommissionExecutor( + meq(expectedKilledExec.get), meq(decomInfo), meq(true)) + } } else { - verify(allocationClient, never).killExecutor(null) + if (decommissioning) { + verify(allocationClient, never).decommissionExecutor(null, null, false) + verify(allocationClient, never).decommissionExecutor(null, null, true) + } else { + verify(allocationClient, never).killExecutor(null) + } } } From bff1ef7f3aa2b750959c39ddf5be664e3a8c0f4c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 10 Aug 2020 12:32:46 -0700 Subject: [PATCH 2/7] CR feedback, move adjustExecutors to a common utility function --- .../spark/ExecutorAllocationClient.scala | 9 ++-- .../spark/ExecutorAllocationManager.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 53 +++++++++---------- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../scheduler/WorkerDecommissionSuite.scala | 2 +- 5 files changed, 31 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 85022c4a3bc1d..583a3b6ec1f91 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -93,7 +93,7 @@ private[spark] trait ExecutorAllocationClient { * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def decommissionExecutors( - executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], adjustTargetNumExecutors: Boolean): Seq[String] = { killExecutors(executorsAndDecomInfo.map(_._1), adjustTargetNumExecutors, @@ -103,19 +103,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager decommission the specified executor. - * Default implementation delegates to decommissionExecutors, scheduler can override - * if it supports graceful decommissioning. + * Delegates to decommissionExecutors. * * @param executorId identifiers of executor to decommission * @param decommissionInfo information about the decommission (reason, host loss) * @param adjustTargetNumExecutors if we should adjust the target number of executors. * @return whether the request is acknowledged by the cluster manager. */ - def decommissionExecutor(executorId: String, + final def decommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo, adjustTargetNumExecutors: Boolean): Boolean = { val decommissionedExecutors = decommissionExecutors( - Seq((executorId, decommissionInfo)), + Array((executorId, decommissionInfo)), adjustTargetNumExecutors = adjustTargetNumExecutors) decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index ae2d2b541f944..76cf9b514f69f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -575,7 +575,7 @@ private[spark] class ExecutorAllocationManager( // when the task backlog decreased. if (conf.get(WORKER_DECOMMISSION_ENABLED)) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( - id => (id, ExecutorDecommissionInfo("spark scale down", false))) + id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f2959c39a3fcb..9e16d2b950c71 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -513,7 +513,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return the ids of the executors acknowledged by the cluster manager to be removed. */ override def decommissionExecutors( - executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], adjustTargetNumExecutors: Boolean): Seq[String] = { val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => @@ -530,21 +530,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // If we don't want to replace the executors we are decommissioning if (adjustTargetNumExecutors) { - executorsToDecommission.foreach { case (exec, _) => - val rpId = withLock { - executorDataMap(exec).resourceProfileId - } - val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - if (requestedTotalExecutorsPerResourceProfile.isEmpty) { - // Assume that we are killing an executor that was started by default and - // not through the request api - requestedTotalExecutorsPerResourceProfile(rp) = 0 - } else { - val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) - requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) - } - } - doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + adjustExecutors(executorsToDecommission.map(_._1)) } val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => @@ -846,6 +832,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = Future.successful(false) + /** + * Adjust the number of executors being requested to no longer include the provided executors. + */ + private def adjustExecutors(executorIds: Seq[String]) = { + executorIds.foreach { exec => + withLock { + val rpId = executorDataMap(exec).resourceProfileId + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + /** * Request that the cluster manager kill the specified executors. * @@ -884,19 +891,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. val adjustTotalExecutors = if (adjustTargetNumExecutors) { - executorsToKill.foreach { exec => - val rpId = executorDataMap(exec).resourceProfileId - val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - if (requestedTotalExecutorsPerResourceProfile.isEmpty) { - // Assume that we are killing an executor that was started by default and - // not through the request api - requestedTotalExecutorsPerResourceProfile(rp) = 0 - } else { - val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) - requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) - } - } - doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + adjustExecutors(executorsToKill) } else { Future.successful(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 660b3cf4bd8b6..3acb6f1088e13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -177,7 +177,7 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") val execId = fullId.split("/")(1) - decommissionExecutors(Seq((execId, decommissionInfo)), adjustTargetNumExecutors = false) + decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false) logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index fe16da77b4727..ea5be21d16d85 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -77,7 +77,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() // Make the executors decommission, finish, exit, and not be replaced. - val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))) + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) From 995ffa9637b1ffbd08d925f6509875d2efbae5d4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 10 Aug 2020 21:38:47 -0700 Subject: [PATCH 3/7] Exclude some non-public APIs --- project/SparkBuild.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5a3ac213c2057..110c311876f48 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -838,6 +838,8 @@ object Unidoc { f.getCanonicalPath.contains("org/apache/spark/shuffle") && !f.getCanonicalPath.contains("org/apache/spark/shuffle/api"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/executor"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/ExecutorAllocationClient"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend"))) .map(_.filterNot(f => f.getCanonicalPath.contains("org/apache/spark/unsafe") && !f.getCanonicalPath.contains("org/apache/spark/unsafe/types/CalendarInterval"))) From cc76ff5104e8376063f640568d8983adc5dda32d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 10 Aug 2020 20:21:03 -0700 Subject: [PATCH 4/7] More CR feedback --- .../spark/ExecutorAllocationManager.scala | 8 ++- .../CoarseGrainedSchedulerBackend.scala | 60 +------------------ .../scheduler/dynalloc/ExecutorMonitor.scala | 28 +++++---- .../ExecutorAllocationManagerSuite.scala | 24 ++++---- 4 files changed, 35 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 76cf9b514f69f..1cb840f2fe802 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -128,6 +128,8 @@ private[spark] class ExecutorAllocationManager( private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) + private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED) + private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id validateSettings() @@ -209,7 +211,7 @@ private[spark] class ExecutorAllocationManager( // storage shuffle decommissioning is enabled we have *experimental* support for // decommissioning without a shuffle service. if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || - (conf.get(WORKER_DECOMMISSION_ENABLED) && + (decommissionEnabled && conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { @@ -573,7 +575,7 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + if (decommissionEnabled) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) @@ -592,7 +594,7 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + if (decommissionEnabled) { executorMonitor.executorsDecommissioned(executorsRemoved) } else { executorMonitor.executorsKilled(executorsRemoved.toSeq) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9e16d2b950c71..ccf65b2a1123f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: $decommissionInfo") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") - decommissionExecutor(executorId, decommissionInfo) - context.reply(true) + context.reply(decommissionExecutor(executorId, decommissionInfo, + adjustTargetNumExecutors = false)) case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) @@ -419,60 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } - /** - * Mark a given executor as decommissioned and stop making resource offers for it. - * - */ - private def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { - val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId - true - } else { - false - } - } - - if (shouldDisable) { - logInfo(s"Starting decommissioning executor $executorId.") - try { - scheduler.executorDecommission(executorId, decommissionInfo) - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - } - // Send decommission message to the executor, this may be a duplicate since the executor - // could have been the one to notify us. But it's also possible the notification came from - // elsewhere and the executor does not yet know. - executorDataMap.get(executorId) match { - case Some(executorInfo) => - executorInfo.executorEndpoint.send(DecommissionSelf) - case None => - // Ignoring the executor since it is not registered. - logWarning(s"Attempted to decommission unknown executor $executorId.") - } - logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } - } else { - logInfo(s"Skipping decommissioning of executor $executorId.") - } - shouldDisable - } - /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index fe2efd92817e1..8dbdc84905e36 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -363,22 +363,24 @@ private[spark] class ExecutorMonitor( UNKNOWN_RESOURCE_PROFILE_ID) // Check if it is a shuffle file, or RDD to pick the correct codepath for update - if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { - /** - * The executor monitor keeps track of locations of cache and shuffle blocks and this can be - * used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks - * around now this wires it up so that it keeps track of it. We only do this for data blocks - * as index and other blocks blocks do not necessarily mean the entire block has been - * committed. - */ - event.blockUpdatedInfo.blockId match { - case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) - case _ => // For now we only update on data blocks + if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && + shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can + * be used to decide which executor(s) Spark should shutdown first. Since we move shuffle + * blocks around now this wires it up so that it keeps track of it. We only do this for + * data blocks as index and other blocks blocks do not necessarily mean the entire block + * has been committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } } return - } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { - return } + val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index a5d52506b1268..9e06625371ae6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -97,17 +97,17 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } /** Verify that a particular executor was scaled down. */ - def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { - if (expectedKilledExec.nonEmpty) { + def verifyScaledDownExec(expectedExec: Option[String]): Unit = { + if (expectedExec.nonEmpty) { val decomInfo = ExecutorDecommissionInfo("spark scale down", false) if (decommissioning) { verify(allocationClient, times(1)).decommissionExecutor( - meq(expectedKilledExec.get), meq(decomInfo), meq(true)) - verify(allocationClient, never).killExecutor(meq(expectedKilledExec.get)) + meq(expectedExec.get), meq(decomInfo), meq(true)) + verify(allocationClient, never).killExecutor(meq(expectedExec.get)) } else { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get)) verify(allocationClient, never).decommissionExecutor( - meq(expectedKilledExec.get), meq(decomInfo), meq(true)) + meq(expectedExec.get), meq(decomInfo), meq(true)) } } else { if (decommissioning) { @@ -122,41 +122,41 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase // Batch proc time = batch interval, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation(batchDurationMillis) { verifyTotalRequestedExecs(Some(3)) // one already allocated, increase allocation by 1 - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time = batch interval * 2, should increase allocation by 2 addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) { verifyTotalRequestedExecs(Some(4)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale up ratio, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(Some(3)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly less than the scale up ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(Some("2")) + verifyScaledDownExec(Some("2")) } } } From 6a6912606761a503ec85dbc54fa5ea465771effc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Aug 2020 13:23:09 -0700 Subject: [PATCH 5/7] On github actions the listener might take more time to finish, and add a comment --- .../org/apache/spark/deploy/DecommissionWorkerSuite.scala | 6 ++++-- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index ee9a6be03868f..2dd1472448440 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -242,8 +242,10 @@ class DecommissionWorkerSuite assert(jobResult === 2) } // 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage attempt 1 and 2. - val tasksSeen = listener.getTasksFinished() - assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") + eventually(timeout(30.seconds), interval(10.seconds)) { + val tasksSeen = listener.getTasksFinished() + assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") + } } private abstract class RootStageAwareListener extends SparkListener { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 1951304ef15dd..82f87a5b58b46 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -188,6 +188,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execToDecommission = getCandidateExecutorToDecom.get logInfo(s"Decommissioning executor ${execToDecommission}") + + // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, ExecutorDecommissionInfo("", isHostDecommissioned = false), From 80629eb0e67a508560ba351a06398500f2e2fbf9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Aug 2020 19:49:19 -0700 Subject: [PATCH 6/7] Back out the taskSeen change in test --- .../org/apache/spark/deploy/DecommissionWorkerSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index 2dd1472448440..ee9a6be03868f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -242,10 +242,8 @@ class DecommissionWorkerSuite assert(jobResult === 2) } // 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage attempt 1 and 2. - eventually(timeout(30.seconds), interval(10.seconds)) { - val tasksSeen = listener.getTasksFinished() - assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") - } + val tasksSeen = listener.getTasksFinished() + assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") } private abstract class RootStageAwareListener extends SparkListener { From e970cb10147fb64533f5088edc3a448b5ef198cf Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 12 Aug 2020 12:08:56 -0700 Subject: [PATCH 7/7] Avoid adjusting executors requested if we don't any actually changing state. --- .../scala/org/apache/spark/ExecutorAllocationClient.scala | 1 + .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 583a3b6ec1f91..079340a358acf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -18,6 +18,7 @@ package org.apache.spark import org.apache.spark.scheduler.ExecutorDecommissionInfo + /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ccf65b2a1123f..ca657313c14f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -479,10 +479,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp adjustExecutors(executorsToDecommission.map(_._1)) } - val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => + executorsToDecommission.filter { case (executorId, decomInfo) => doDecommission(executorId, decomInfo) }.map(_._1) - decommissioned } @@ -782,6 +781,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Adjust the number of executors being requested to no longer include the provided executors. */ private def adjustExecutors(executorIds: Seq[String]) = { + if (executorIds.nonEmpty) { executorIds.foreach { exec => withLock { val rpId = executorDataMap(exec).resourceProfileId @@ -797,6 +797,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } else { + Future.successful(true) + } } /**