diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 49a319abb3238..9d07661b9a651 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -58,6 +58,11 @@ private[deploy] object DeployMessages { assert (port > 0) } + case class WorkerDecommission( + id: String, + worker: RpcEndpointRef) + extends DeployMessage + case class ExecutorStateChanged( appId: String, execId: Int, @@ -131,6 +136,8 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + case object DecommissionSelf // Mark self for decommissioning. + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index 69c98e28931d7..0751bcf221f86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,9 +19,13 @@ package org.apache.spark.deploy private[deploy] object ExecutorState extends Enumeration { - val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value + val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value type ExecutorState = Value - def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state) + // DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from + // the worker and the executor still exists - but we do want to avoid scheduling new tasks on it. + private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED) + + def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 34ade4ce6f39b..6df505bda02b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -39,6 +39,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} * Takes a master URL, an app description, and a listener for cluster events, and calls * back the listener when various events occur. * + * * @param masterUrls Each url should look like spark://host:port. */ private[spark] class StandaloneAppClient( @@ -180,6 +181,8 @@ private[spark] class StandaloneAppClient( logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) + } else if (state == ExecutorState.DECOMMISSIONED) { + listener.executorDecommissioned(fullId, message.getOrElse("")) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index d8bc1a883def1..2e38a6847891d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + def executorDecommissioned(fullId: String, message: String): Unit + def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5db9b529a2dce..9f68f1c8d21bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -243,6 +243,15 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) + case WorkerDecommission(id, workerRef) => + logInfo("Recording worker %s decommissioning".format(id)) + if (state == RecoveryState.STANDBY) { + workerRef.send(MasterInStandby) + } else { + // If a worker attempts to decommission that isn't registered ignore it. + idToWorker.get(id).foreach(decommissionWorker) + } + case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( @@ -311,7 +320,9 @@ private[deploy] class Master( // Only retry certain number of times so we don't go into an infinite loop. // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. + // We also don't count failures from decommissioned workers since they are "expected." if (!normalExit + && oldState != ExecutorState.DECOMMISSIONED && appInfo.incrementRetryCount() >= maxExecutorRetries && maxExecutorRetries >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values @@ -790,6 +801,26 @@ private[deploy] class Master( true } + private def decommissionWorker(worker: WorkerInfo) { + if (worker.state != WorkerState.DECOMMISSIONED) { + logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port)) + worker.setState(WorkerState.DECOMMISSIONED) + for (exec <- worker.executors.values) { + logInfo("Telling app of decommission executors") + exec.application.driver.send(ExecutorUpdated( + exec.id, ExecutorState.DECOMMISSIONED, + Some("worker decommissioned"), None, workerLost = false)) + exec.state = ExecutorState.DECOMMISSIONED + exec.application.removeExecutor(exec) + } + // On recovery do not add a decommissioned executor + persistenceEngine.removeWorker(worker) + } else { + logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned". + format(worker.id, worker.host, worker.port)) + } + } + private def removeWorker(worker: WorkerInfo, msg: String) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f8ec5b6b190c1..1a69f2374b01c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.rpc._ -import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} +import org.apache.spark.util.{SignalUtils, SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -63,6 +63,14 @@ private[deploy] class Worker( Utils.checkHost(host) assert (port > 0) + // If worker decommissioning is enabled register a handler on PWR to shutdown. + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logInfo("Registering PWR handler.") + SignalUtils.register("PWR")(decommissionSelf) + } else { + logInfo("Worker decommissioning not enabled, skipping PWR") + } + // A scheduled executor used to send messages at the specified time. private val forwardMessageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler") @@ -124,6 +132,7 @@ private[deploy] class Worker( private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false private var connected = false + private var decommissioned = false private val workerId = generateWorkerId() private val sparkHome = if (sys.props.contains(IS_TESTING.key)) { @@ -491,6 +500,8 @@ private[deploy] class Worker( case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else if (decommissioned) { + logWarning("Asked to launch an executor while decommissioned. Not launching executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) @@ -610,6 +621,9 @@ private[deploy] class Worker( case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) + + case DecommissionSelf => + decommissionSelf() } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -707,6 +721,20 @@ private[deploy] class Worker( } } + private[deploy] def decommissionSelf(): Boolean = { + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logDebug("Decommissioning self") + decommissioned = true + // TODO: Send decommission notification to executors & shuffle service. + // Also send message to master program. + sendToMaster(WorkerDecommission(workerId, self)) + } else { + logWarning("Asked to decommission self, but decommissioning not enabled") + } + // Return true since can be called as a signal handler + true + } + private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { val driverId = driverStateChanged.driverId val exception = driverStateChanged.exception diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 645f58716de63..d509cc6ca4b15 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,11 +31,12 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging +import org.apache.spark.internal.config import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SignalUtils, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -49,6 +50,7 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null + @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -56,6 +58,9 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() override def onStart() { + logInfo("Registering PWR handler.") + SignalUtils.register("PWR")(decommissionSelf) + logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" @@ -99,6 +104,8 @@ private[spark] class CoarseGrainedExecutorBackend( case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") + } else if (decommissioned) { + logWarning("Asked to launch a task while decommissioned. Not launching.") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) @@ -177,6 +184,26 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(code) } + + private def decommissionSelf(): Boolean = { + logError("Decommissioning self") + try { + decommissioned = true + // Tell master we are are decommissioned so it stops trying to schedule us + if (driver.nonEmpty) { + driver.get.send(DecommissionExecutor(executorId)) + } + if (executor != null) { + executor.decommission() + } + // Return true since we are handling a signal + true + } catch { + case e: Exception => + logError(s"Error ${e} during attempt to decommission self") + false + } + } } private[spark] object CoarseGrainedExecutorBackend extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index cc3cc1604d68b..5f4b5e01866e4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -205,14 +205,31 @@ private[spark] class Executor( */ private var heartbeatFailures = 0 + /** + * Flag to prevent launching new tasks while decommissioned. There could be a race condition + * accessing this, but decommissioning is only intended to help not be a hard stop. + */ + private var decommissioned = false + heartbeater.start() private[executor] def numRunningTasks: Int = runningTasks.size() + /** + * Mark an executor for decommissioning and avoid launching new tasks. + */ + private[spark] def decommission(): Unit = { + decommissioned = true + } + def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { - val tr = new TaskRunner(context, taskDescription) - runningTasks.put(taskDescription.taskId, tr) - threadPool.execute(tr) + if (!decommissioned) { + val tr = new TaskRunner(context, taskDescription) + runningTasks.put(taskDescription.taskId, tr) + threadPool.execute(tr) + } else { + log.info(s"Not launching task, executor is in decommissioned state") + } } def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index 47f7167d2c9cb..f29d7d4aa467e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -60,4 +60,9 @@ private[spark] object Worker { ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize") .intConf .createWithDefault(100) + + private[spark] val WORKER_DECOMMISSION_ENABLED = + ConfigBuilder("spark.worker.decommission.enabled") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b67e9906457d..b363f92d53193 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -339,6 +339,7 @@ abstract class RDD[T: ClassTag]( readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { + // Block hit. case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics @@ -352,6 +353,7 @@ abstract class RDD[T: ClassTag]( } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } + // Need to compute the block. case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 46a35b6a2eaf9..18579e25da013 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los private[spark] case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false) extends ExecutorLossReason(_message) + +/** + * A loss reason that means the worker is marked for decommissioning. + * + * This is used by the task scheduler to remove state associated with the executor, but + * not yet fail any tasks that were running in the executor before the executor is "fully" lost. + */ +private [spark] object WorkerDecommission extends ExecutorLossReason("Worker Decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index f4b0ab10155a2..3a8e00cfd69c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -88,6 +88,10 @@ private[spark] class Pool( schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason)) } + override def executorDecommission(executorId: String): Unit = { + schedulableQueue.asScala.foreach(_.executorDecommission(executorId)) + } + override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index b6f88ed0a93aa..8cc239c81d11a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -43,6 +43,7 @@ private[spark] trait Schedulable { def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit + def executorDecommission(executorId: String): Unit def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1862e16824277..d572e97ba20bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -96,6 +96,11 @@ private[spark] trait TaskScheduler { */ def applicationId(): String = appId + /** + * Process a decommissioning executor. + */ + def executorDecommission(executorId: String): Unit + /** * Process a lost executor */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7e820c32fa78d..5d8526ed63745 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -711,6 +711,11 @@ private[spark] class TaskSchedulerImpl( } } + override def executorDecommission(executorId: String): Unit = { + rootPool.executorDecommission(executorId) + backend.reviveOffers() + } + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b3aa814537500..9260c884dc5e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1105,6 +1105,12 @@ private[spark] class TaskSetManager( levels.toArray } + def executorDecommission(execId: String) { + recomputeLocality() + // Future consideration: if an executor is decommissioned it may make sense to add the current + // tasks to the spec exec queue. + } + def recomputeLocality() { val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index afb48a31754f9..9471b434e36f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -90,6 +90,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4830d0e6f8008..3bfa51d60b130 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -117,6 +117,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] + // Executors which are being decommissioned + protected val executorsPendingDecommission = new HashSet[String] protected val addressToExecutorId = new HashMap[RpcAddress, String] @@ -245,6 +247,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) + case DecommissionExecutor(executorId) => + logInfo(s"Decommissioning executor ${executorId}") + decommissionExecutor(executorId) + context.reply(true) + case RetrieveSparkAppConfig => val reply = SparkAppConfig( sparkProperties, @@ -301,7 +308,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def executorIsAlive(executorId: String): Boolean = synchronized { !executorsPendingToRemove.contains(executorId) && - !executorsPendingLossReason.contains(executorId) + !executorsPendingLossReason.contains(executorId) && + !executorsPendingDecommission.contains(executorId) } // Launch tasks returned by a set of resource offers @@ -368,6 +376,28 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } + /** + * Stop making resource offers for the given executor. The executor is marked as lost with the + * loss reason as WorkerDecommission. + */ + private def decommissionExecutor(executorId: String): Boolean = { + val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (executorIsAlive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + + if (shouldDisable) { + logInfo(s"Decommissioning executor $executorId.") + scheduler.executorDecommission(executorId) + } + shouldDisable + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -493,6 +523,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(t.getMessage, t))(ThreadUtils.sameThread) } + /** + * Called by subclasses when notified of a decommissioning worker. + */ + private[spark] def decommissionExecutor(executorId: String): Unit = { + // Only log the failure since we don't care about the result. + driverEndpoint.ask[Boolean](DecommissionExecutor(executorId)).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) + } + def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index e0605fee9cbf2..e8548c231c264 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -169,6 +169,11 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + override def executorDecommissioned(fullId: String, message: String) { + logInfo("Executor %s decommissioned: %s".format(fullId, message)) + decommissionExecutor(fullId.split("/")(1)) + } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { logInfo("Worker %s removed: %s".format(workerId, message)) removeWorker(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 5a24965170cef..1d507185802ce 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -60,10 +60,11 @@ private[spark] object SignalUtils extends Logging { if (SystemUtils.IS_OS_UNIX) { try { val handler = handlers.getOrElseUpdate(signal, { - logInfo("Registered signal handler for " + signal) + logInfo("Registering signal handler for " + signal) new ActionHandler(new Signal(signal)) }) handler.register(action) + logInfo("Registered signal handler for " + signal) } catch { case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a1d3077b8fc87..a3e39d7f53728 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils @@ -44,13 +44,13 @@ class AppClientSuite with Eventually with ScalaFutures { private val numWorkers = 2 - private val conf = new SparkConf() - private val securityManager = new SecurityManager(conf) + private var conf: SparkConf = null private var masterRpcEnv: RpcEnv = null private var workerRpcEnvs: Seq[RpcEnv] = null private var master: Master = null private var workers: Seq[Worker] = null + private var securityManager: SecurityManager = null /** * Start the local cluster. @@ -58,6 +58,8 @@ class AppClientSuite */ override def beforeAll(): Unit = { super.beforeAll() + conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + securityManager = new SecurityManager(conf) masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) workerRpcEnvs = (0 until numWorkers).map { i => RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) @@ -111,8 +113,23 @@ class AppClientSuite assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") } + + // Save the executor id before decommissioning so we can kill it + val application = getApplications().head + val executors = application.executors + val executorId: String = executors.head._2.fullId + + // Send a decommission self to all the workers + // Note: normally the worker would send this on their own. + workers.foreach(worker => worker.decommissionSelf()) + + // Decommissioning is async. + eventually(timeout(1.seconds), interval(10.millis)) { + // We only record decommissioning for the executor we've requested + assert(ci.listener.execDecommissionedList.size === 1) + } + // Send request to kill executor, verify request was made - val executorId: String = getApplications().head.executors.head._2.fullId whenReady( ci.client.killExecutors(Seq(executorId)), timeout(10.seconds), @@ -120,6 +137,15 @@ class AppClientSuite assert(acknowledged) } + // Verify that asking for executors on the decommissioned workers fails + whenReady( + ci.client.requestTotalExecutors(numExecutorsRequested), + timeout(10.seconds), + interval(10.millis)) { acknowledged => + assert(acknowledged) + } + assert(getApplications().head.executors.size === 0) + // Issue stop command for Client to disconnect from Master ci.client.stop() @@ -189,6 +215,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() + val execDecommissionedList = new ConcurrentLinkedQueue[String]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -218,6 +245,10 @@ class AppClientSuite execRemovedList.add(id) } + def executorDecommissioned(id: String, message: String): Unit = { + execDecommissionedList.add(id) + } + def workerRemoved(workerId: String, host: String, message: String): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 749e47c2727e7..a5346c672db93 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def executorDecommission(executorId: String) = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -679,6 +680,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: ExecutorMetrics): Boolean = true + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 347064dc9aadf..ee858a53036ef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -88,6 +88,7 @@ private class DummyTaskScheduler extends TaskScheduler { stageId: Int, partitionId: Int, taskDuration: Long): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala new file mode 100644 index 0000000000000..483c449fa9e14 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} + +class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { + + + override def beforeEach(): Unit = { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) + } + + test("verify task with no decommissioning works as expected") { + val input = sc.parallelize(1 to 10) + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(100) + x + } + assert(sleepyRdd.count() === 10) + } + + test("verify a task with all workers decommissioned succeeds") { + val input = sc.parallelize(1 to 10) + // Do a count to wait for the executors to be registered. + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(100) + x + } + // Start the task + val asyncCount = sleepyRdd.countAsync() + Thread.sleep(10) + // Decommission all the executors, this should not halt the current task. + // The master passing message is tested with + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + val execs = sched.getExecutorIds() + execs.foreach(execId => sched.decommissionExecutor(execId)) + assert(asyncCount.get() === 10) + // Try and launch task after decommissioning, this should fail + val postDecommissioned = input.map(x => x) + val postDecomAsyncCount = postDecommissioned.countAsync() + val thrown = intercept[java.util.concurrent.TimeoutException]{ + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 1.seconds) + } + assert(postDecomAsyncCount.isCompleted === false, + "After exec decommission new task could not launch") + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 5e741112fc7e7..b040451988e5c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -54,6 +54,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { } } + def workerDecomissioning: Boolean = + sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED) + def nodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index af162839fef70..61f27dcd6914e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress @@ -33,7 +34,7 @@ import org.apache.spark.util.Utils private[spark] class BasicExecutorFeatureStep( kubernetesConf: KubernetesExecutorConf, secMgr: SecurityManager) - extends KubernetesFeatureConfigStep { + extends KubernetesFeatureConfigStep with Logging { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf private val executorContainerImage = kubernetesConf @@ -187,6 +188,21 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) + val containerWithLifecycle = kubernetesConf.workerDecomissioning match { + case false => + logInfo("Decommissioning not enabled, skipping shutdown script") + containerWithLimitCores + case true => + logInfo("Adding decommission script to lifecycle") + new ContainerBuilder(containerWithLimitCores).withNewLifecycle() + .withNewPreStop() + .withNewExec() + .addToCommand("/opt/decom.sh") + .endExec() + .endPreStop() + .endLifecycle() + .build() + } val ownerReference = kubernetesConf.driverPod.map { pod => new OwnerReferenceBuilder() .withController(true) @@ -211,6 +227,8 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - SparkPod(executorPod, containerWithLimitCores) + logInfo(s"Built container $containerWithLifecycle") + + SparkPod(executorPod, containerWithLifecycle) } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 871d34b11e174..67bad7168b759 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -43,6 +43,7 @@ COPY jars /opt/spark/jars COPY bin /opt/spark/bin COPY sbin /opt/spark/sbin COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ +COPY kubernetes/dockerfiles/spark/decom.sh /opt/ COPY examples /opt/spark/examples COPY kubernetes/tests /opt/spark/tests COPY data /opt/spark/data diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh new file mode 100755 index 0000000000000..0149c0e92cdbb --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +set -ex +export LOG=/dev/termination-log +echo "hi" +echo "Starting decom adventures" > ${LOG} || echo "logging is hard" +date | tee -a ${LOG} +# TODO(holden): Fix this PID extraction +WORKER_PID=$(ps axf | grep java |grep org.apache.spark.executor.CoarseGrainedExecutorBackend | grep -v grep) +echo "Using worker pid $WORKER_PID" | tee -a ${LOG} +kill -s SIGPWR ${WORKER_PID} | tee -a ${LOG} +echo "Waiting for worker pid to exit" +date +timeout 60 tail --pid=${WORKER_PID} -f /dev/null | tee -a ${LOG} +date +sleep 60 | tee -a ${LOG} +date +echo "Done" | tee -a ${LOG} +date | tee -a ${LOG} +echo "Term log was:" +cat $LOG diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 2097fb8865de9..c1208be984d67 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -95,3 +95,9 @@ esac # Execute the container CMD under tini for better hygiene exec /sbin/tini -s -- "${CMD[@]}" + +# Print out the termination log as we exit +sleep 1 +echo "Finished. Termination log:" +cat /dev/termination-log +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md index ea8286124a68c..febf6214e8e4d 100644 --- a/resource-managers/kubernetes/integration-tests/README.md +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -11,7 +11,7 @@ is subject to change. Note that currently the integration tests only run with Ja The simplest way to run the integration tests is to install and run Minikube, then run the following from this directory: - dev/dev-run-integration-tests.sh + dev/dev-run-integration-tests.sh --spark-tgz [spark_release_build] The minimum tested version of Minikube is 0.23.0. The kube-dns addon must be enabled. Minikube should run with a minimum of 4 CPUs and 6G of memory: diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index ecfe4058f5598..a954b1386e6b5 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +set -ex set -xo errexit TEST_ROOT_DIR=$(git rev-parse --show-toplevel) @@ -35,11 +36,13 @@ INCLUDE_TAGS="k8s" EXCLUDE_TAGS= MVN="$TEST_ROOT_DIR/build/mvn" -SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/null\ +export SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/null\ | grep -v "INFO"\ | grep -v "WARNING"\ | tail -n 1) +echo $SCALA_VERSION + # Parse arguments while (( "$#" )); do case $1 in @@ -100,7 +103,8 @@ while (( "$#" )); do shift ;; *) - break + echo "Unexpected propert $2 $1 breaking parsing" + exit 1 ;; esac shift diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala new file mode 100644 index 0000000000000..8735004d797de --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.spark.internal.config.Worker + +private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => + + import DecommissionSuite._ + import KubernetesSuite.k8sTestTag + + test("Test basic decommissioning", k8sTestTag) { + // scalastyle:off println + println("***TESTING decommissioning***") + // scalastyle:on println + sparkAppConf + .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + .set("spark.kubernetes.pyspark.pythonVersion", "3") + .set("spark.kubernetes.container.image", pyImage) + + // scalastyle:off println + println("***Running app***") + // scalastyle:on println + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_DECOMISSIONING, + mainClass = "", + expectedLogOnCompletion = Seq("Decommissioning executor"), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + decomissioningTest = true) + // scalastyle:off println + println("***END TESTING decommissioning***") + // scalastyle:on println + } +} + +private[spark] object DecommissionSuite { + val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/" + val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index bc0bb20908254..739183491869d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +// scalastyle:off println package org.apache.spark.deploy.k8s.integrationtest import java.io.File @@ -39,9 +40,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with Logging with Eventually with Matchers { + with BeforeAndAfterAll with BeforeAndAfter with DecommissionSuite with BasicTestsSuite + with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite + with PodTemplateSuite with PVTestsSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -255,6 +256,7 @@ class KubernetesSuite extends SparkFunSuite } } + // scalastyle:off argcount protected def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -265,28 +267,38 @@ class KubernetesSuite extends SparkFunSuite appLocator: String, isJVM: Boolean, pyFiles: Option[String] = None, - interval: Option[PatienceConfiguration.Interval] = None): Unit = { + interval: Option[PatienceConfiguration.Interval] = None, + decomissioningTest: Boolean = false): Unit = { + // scalastyle:on argcount val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch( - appArguments, - sparkAppConf, - TIMEOUT.value.toSeconds.toInt, - sparkHomeDir, - isJVM, - pyFiles) - val driverPod = kubernetesTestComponents.kubernetesClient - .pods() - .withLabel("spark-app-locator", appLocator) - .withLabel("spark-role", "driver") - .list() - .getItems - .get(0) - driverPodChecker(driverPod) val execPods = scala.collection.mutable.Map[String, Pod]() + def checkPodReady(namespace: String, name: String) = { + println(s"!!! doing ready check on pod $name in $namespace") + val execPod = kubernetesTestComponents.kubernetesClient + .pods() + .inNamespace(namespace) + .withName(name) + .get() + println(s"!!! got pod $execPod for $name in namespace $namespace") + val resourceStatus = execPod.getStatus + println(s"!!! status $resourceStatus for $name") + val conditions = resourceStatus.getConditions().asScala + println(s"!!! conditions $conditions for $name") + val conditionTypes = conditions.map(_.getType()) + println(s"!!! condition types $conditionTypes") + val readyConditions = conditions.filter{cond => cond.getType() == "Ready"} + println(s"!!! ready conditions $readyConditions for $name") + val result = readyConditions + .map(cond => cond.getStatus() == "True") + .headOption.getOrElse(false) + println(s"Pod name ${name} ready check resulted in ${result}") + result + } + println("Creating watcher...") val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) @@ -296,20 +308,89 @@ class KubernetesSuite extends SparkFunSuite override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + println("Event received.") val name = resource.getMetadata.getName + val namespace = resource.getMetadata().getNamespace() action match { - case Action.ADDED | Action.MODIFIED => + case Action.MODIFIED => + execPods(name) = resource + case Action.ADDED => + println(s"Add event received for $name.") execPods(name) = resource + // If testing decomissioning delete the first with a delay after it starts + // running. + if (decomissioningTest && execPods.size == 1) { + // Wait for all the containers in the pod to be running + println("Waiting for pod to become OK then delete.") + Eventually.eventually(POD_RUNNING_TIMEOUT, INTERVAL) { + val result = checkPodReady(namespace, name) + result shouldBe (true) + } + // Sleep a small interval to allow execution of job + println("Sleeping before killing pod.") + Thread.sleep(30000) + // Delete the pod to simulate cluster scale down/migration. + val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) + pod.delete() + println(s"Pod: $name deleted") + } else { + println(s"Resource $name added") + } case Action.DELETED | Action.ERROR => + println("Deleted or error event received.") execPods.remove(name) + println("Resrouce $name removed") } } }) val patienceInterval = interval.getOrElse(INTERVAL) - Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } + println("Running spark job.") + SparkAppLauncher.launch( + appArguments, + sparkAppConf, + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + isJVM, + pyFiles) + + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + println("Doing driver pod check") + driverPodChecker(driverPod) + println("Done driver pod check") + // If we're testing decomissioning we delete all the executors, but we should have + // an executor at some point. + Eventually.eventually(POD_RUNNING_TIMEOUT, patienceInterval) { + println(s"Driver podcheck iteration non empty: ${execPods.values.nonEmpty} with ${execPods}") + execPods.values.nonEmpty should be (true) + } + // If decomissioning we need to wait and check the executors were removed + if (decomissioningTest) { + // Sleep a small interval to ensure everything is registered. + Thread.sleep(100) + // Wait for the executors to become ready + Eventually.eventually(POD_RUNNING_TIMEOUT, patienceInterval) { + val anyReadyPods = ! execPods.map{ + case (name, resource) => + (name, resource.getMetadata().getNamespace()) + }.filter{ + case (name, namespace) => checkPodReady(namespace, name) + }.isEmpty + val podsEmpty = execPods.values.isEmpty + val podsReadyOrDead = anyReadyPods || podsEmpty + podsReadyOrDead shouldBe (true) + } + } + println(s"Closing watcher with execPods $execPods nonEmpty: ${execPods.values.nonEmpty}") execWatcher.close() execPods.values.foreach(executorPodChecker(_)) + println(s"Close to the end exec pods are $execPods") Eventually.eventually(TIMEOUT, patienceInterval) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient @@ -319,6 +400,7 @@ class KubernetesSuite extends SparkFunSuite .contains(e), "The application did not complete.") } } + println(s"end exec pods are $execPods") } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) @@ -413,5 +495,7 @@ private[spark] object KubernetesSuite { val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) - val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val POD_RUNNING_TIMEOUT = PatienceConfiguration.Timeout(Span(5, Minutes)) + val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds)) } +// scalastyle:on println diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py new file mode 100644 index 0000000000000..c7d1fc64409c8 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +import sys +import time + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: decomissioning_water + """ + print("Starting decom test") + spark = SparkSession \ + .builder \ + .appName("PyMemoryTest") \ + .getOrCreate() + sc = spark._sc + rdd = sc.parallelize(range(10)) + rdd.collect() + print("Waiting to give nodes time to finish.") + time.sleep(120) + print("Stopping spark") + spark.stop() + sys.exit(0) diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh new file mode 100644 index 0000000000000..4bbf257ff1d3a --- /dev/null +++ b/sbin/decommission-slave.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A shell script to decommission all workers on a single slave +# +# Environment variables +# +# SPARK_WORKER_INSTANCES The number of worker instances that should be +# running on this slave. Default is 1. + +# Usage: decommission-slave.sh [--block-until-exit] +# Decommissions all slaves on this worker machine + +set -ex + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +. "${SPARK_HOME}/sbin/spark-config.sh" + +. "${SPARK_HOME}/bin/load-spark-env.sh" + +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1 +else + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 )) + done +fi + +# Check if --block-until-exit is set. +# This is done for systems which block on the decomissioning script and on exit +# shut down the entire system (e.g. K8s). +if [ "$1" == "--block-until-exit" ]; then + shift + # For now we only block on the 0th instance if there multiple instances. + instance=$1 + pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid" + wait $pid +fi diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index 6de67e039b48f..81f2fd40a706f 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -215,6 +215,21 @@ case $option in fi ;; + (decommission) + + if [ -f $pid ]; then + TARGET_ID="$(cat "$pid")" + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then + echo "decommissioning $command" + kill -s SIGPWR "$TARGET_ID" + else + echo "no $command to decommission" + fi + else + echo "no $command to decommission" + fi + ;; + (status) if [ -f $pid ]; then