From dfc124c6482e28d10ae75645c44dfa1226c816fe Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Wed, 12 Aug 2020 21:07:33 -0700 Subject: [PATCH 1/4] [CORE] Fix regressions in decommissioning The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (#28085 and #29211) neccessitate a small reworking of the decommissioning logic. Before getting into that, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. - Per #29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout. - Per #28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission. These two fixes are local to decommissioning only and don't change other behavior. Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout. Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion. --- .../CoarseGrainedExecutorBackend.scala | 8 ++- .../apache/spark/scheduler/DAGScheduler.scala | 35 +++++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 25 ++++++-- .../deploy/DecommissionWorkerSuite.scala | 62 +++++++++++++++---- .../scheduler/TaskSchedulerImplSuite.scala | 38 +++++++++--- 5 files changed, 131 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 55fb76b3572a3..10bbdb444b46e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -294,10 +294,13 @@ private[spark] class CoarseGrainedExecutorBackend( override def run(): Unit = { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s - + val initialSleepMillis = env.conf.getInt( + "spark.executor.decommission.initial.sleep.millis", sleep_time) + if (initialSleepMillis > 0) { + Thread.sleep(initialSleepMillis) + } while (true) { logInfo("Checking to see if we can shutdown.") - Thread.sleep(sleep_time) if (executor == null || executor.numRunningTasks == 0) { if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { logInfo("No running tasks, checking migrations") @@ -323,6 +326,7 @@ private[spark] class CoarseGrainedExecutorBackend( // move forward. lastTaskRunningTime = System.nanoTime() } + Thread.sleep(sleep_time) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7641948ed4b30..aface201b88f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1846,7 +1846,8 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + maybeEpoch = Some(task.epoch), + ignoreShuffleVersion = isHostDecommissioned) } } @@ -2012,7 +2013,8 @@ private[spark] class DAGScheduler( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], - maybeEpoch: Option[Long] = None): Unit = { + maybeEpoch: Option[Long] = None, + ignoreShuffleVersion: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") @@ -2022,16 +2024,25 @@ private[spark] class DAGScheduler( blockManagerMaster.removeExecutor(execId) clearCacheLocs() } - if (fileLost && - (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { - shuffleFileLostEpoch(execId) = currentEpoch - hostToUnregisterOutputs match { - case Some(host) => - logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnExecutor(execId) + if (fileLost) { + val remove = if (ignoreShuffleVersion) { + true + } else if (!shuffleFileLostEpoch.contains(execId) || + shuffleFileLostEpoch(execId) < currentEpoch) { + shuffleFileLostEpoch(execId) = currentEpoch + true + } else { + false + } + if (remove) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnExecutor(execId) + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a0c507e7f893b..3bdd1b5baa53a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer +import java.util import java.util.{Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -136,7 +137,9 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + // map of second to list of executors to clear form the above map + val decommissioningExecutorsToGc = new util.TreeMap[Long, mutable.ArrayBuffer[String]]() def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap @@ -910,7 +913,7 @@ private[spark] class TaskSchedulerImpl( // if we heard isHostDecommissioned ever true, then we keep that one since it is // most likely coming from the cluster manager and thus authoritative val oldDecomInfo = executorsPendingDecommission.get(executorId) - if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) { + if (!oldDecomInfo.exists(_.isHostDecommissioned)) { executorsPendingDecommission(executorId) = decommissionInfo } } @@ -921,7 +924,13 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionInfo(executorId: String) : Option[ExecutorDecommissionInfo] = synchronized { - executorsPendingDecommission.get(executorId) + import scala.collection.JavaConverters._ + // Garbage collect old decommissioning entries + val secondsToGcUptil = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + val headMap = decommissioningExecutorsToGc.headMap(secondsToGcUptil) + headMap.values().asScala.flatten.foreach(executorsPendingDecommission -= _) + headMap.clear() + executorsPendingDecommission.get(executorId) } override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { @@ -1027,7 +1036,15 @@ private[spark] class TaskSchedulerImpl( } } - executorsPendingDecommission -= executorId + + val decomInfo = executorsPendingDecommission.get(executorId) + if (decomInfo.isDefined) { + val rememberSeconds = + conf.getInt("spark.decommissioningRememberAfterRemoval.seconds", 60) + val gcSecond = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + rememberSeconds + decommissioningExecutorsToGc.computeIfAbsent(gcSecond, _ => mutable.ArrayBuffer.empty) += + executorId + } if (reason != LossReasonPending) { executorIdToHost -= executorId diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index ee9a6be03868f..1ba3c3961f8f6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -84,6 +84,19 @@ class DecommissionWorkerSuite } } + // Unlike TestUtils.withListener, it also waits for the job to be done + def withListener(sc: SparkContext, listener: RootStageAwareListener) + (body: SparkListener => Unit): Unit = { + sc.addSparkListener(listener) + try { + body(listener) + sc.listenerBus.waitUntilEmpty() + listener.waitForJobDone() + } finally { + sc.listenerBus.removeListener(listener) + } + } + test("decommission workers should not result in job failure") { val maxTaskFailures = 2 val numTimesToKillWorkers = maxTaskFailures + 1 @@ -109,7 +122,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 1, 1).map { _ => Thread.sleep(5 * 1000L); 1 }.count() @@ -164,7 +177,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((pid, _) => { val sleepTimeSeconds = if (pid == 0) 1 else 10 Thread.sleep(sleepTimeSeconds * 1000L) @@ -190,10 +203,11 @@ class DecommissionWorkerSuite } } - test("decommission workers ensure that fetch failures lead to rerun") { + def testFetchFailures(initialSleepMillis: Int): Unit = { createWorkers(2) sc = createSparkContext( config.Tests.TEST_NO_STAGE_RETRY.key -> "false", + "spark.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true") val executorIdToWorkerInfo = getExecutorToWorkerAssignments val executorToDecom = executorIdToWorkerInfo.keysIterator.next @@ -212,22 +226,27 @@ class DecommissionWorkerSuite override def handleRootTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val taskInfo = taskEnd.taskInfo if (taskInfo.executorId == executorToDecom && taskInfo.attemptNumber == 0 && - taskEnd.stageAttemptId == 0) { + taskEnd.stageAttemptId == 0 && taskEnd.stageId == 0) { decommissionWorkerOnMaster(workerToDecom, "decommission worker after task on it is done") } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) => { val executorId = SparkEnv.get.executorId - val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 - Thread.sleep(sleepTimeSeconds * 1000L) + val context = TaskContext.get() + if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) { + val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 + Thread.sleep(sleepTimeSeconds * 1000L) + } List(1).iterator }, preservesPartitioning = true) .repartition(1).mapPartitions(iter => { val context = TaskContext.get() if (context.attemptNumber == 0 && context.stageAttemptNumber() == 0) { + // Wait a bit for the decommissioning to be triggered in the listener + Thread.sleep(5000) // MapIndex is explicitly -1 to force the entire host to be decommissioned // However, this will cause both the tasks in the preceding stage since the host here is // "localhost" (shortcoming of this single-machine unit test in that all the workers @@ -246,6 +265,14 @@ class DecommissionWorkerSuite assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") } + test("decommission stalled workers ensure that fetch failures lead to rerun") { + testFetchFailures(3600 * 1000) + } + + test("decommission eager workers ensure that fetch failures lead to rerun") { + testFetchFailures(0) + } + private abstract class RootStageAwareListener extends SparkListener { private var rootStageId: Option[Int] = None private val tasksFinished = new ConcurrentLinkedQueue[String]() @@ -265,6 +292,7 @@ class DecommissionWorkerSuite override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnd.jobResult match { case JobSucceeded => jobDone.set(true) + case JobFailed(exception) => logError(s"Job failed", exception) } } @@ -272,7 +300,15 @@ class DecommissionWorkerSuite protected def handleRootTaskStart(start: SparkListenerTaskStart) = {} + private def getSignature(taskInfo: TaskInfo, stageId: Int, stageAttemptId: Int): + String = { + s"${stageId}:${stageAttemptId}:" + + s"${taskInfo.index}:${taskInfo.attemptNumber}-${taskInfo.status}" + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val signature = getSignature(taskStart.taskInfo, taskStart.stageId, taskStart.stageAttemptId) + logInfo(s"Task started: $signature") if (isRootStageId(taskStart.stageId)) { rootTasksStarted.add(taskStart.taskInfo) handleRootTaskStart(taskStart) @@ -280,8 +316,7 @@ class DecommissionWorkerSuite } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - val taskSignature = s"${taskEnd.stageId}:${taskEnd.stageAttemptId}:" + - s"${taskEnd.taskInfo.index}:${taskEnd.taskInfo.attemptNumber}" + val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId, taskEnd.stageAttemptId) logInfo(s"Task End $taskSignature") tasksFinished.add(taskSignature) if (isRootStageId(taskEnd.stageId)) { @@ -291,8 +326,13 @@ class DecommissionWorkerSuite } def getTasksFinished(): Seq[String] = { - assert(jobDone.get(), "Job isn't successfully done yet") - tasksFinished.asScala.toSeq + tasksFinished.asScala.toList + } + + def waitForJobDone(): Unit = { + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(jobDone.get(), "Job isn't successfully done yet") + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e5836458e7f91..75d890de62038 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} class FakeSchedulerBackend extends SchedulerBackend { def start(): Unit = {} @@ -88,10 +88,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { + setupSchedulerWithMasterAndClock(master, new SystemClock, confs: _*) + } + + def setupSchedulerWithMasterAndClock(master: String, clock: Clock, confs: (String, String)*): + TaskSchedulerImpl = { val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) - taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) setupHelper() } @@ -1802,9 +1807,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(2 == taskDescriptions.head.resources(GPU).addresses.size) } - private def setupSchedulerForDecommissionTests(): TaskSchedulerImpl = { - val taskScheduler = setupSchedulerWithMaster( + private def setupSchedulerForDecommissionTests(clock: Clock): TaskSchedulerImpl = { + val taskScheduler = setupSchedulerWithMasterAndClock( s"local[2]", + clock, config.CPUS_PER_TASK.key -> 1.toString) taskScheduler.submitTasks(FakeTask.createTaskSet(2)) val multiCoreWorkerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 1), @@ -1815,7 +1821,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } test("scheduler should keep the decommission info where host was decommissioned") { - val scheduler = setupSchedulerForDecommissionTests() + val scheduler = setupSchedulerForDecommissionTests(new SystemClock) scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) @@ -1829,8 +1835,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionInfo("executor2").isEmpty) } - test("scheduler should ignore decommissioning of removed executors") { - val scheduler = setupSchedulerForDecommissionTests() + test("scheduler should eventually purge removed and decommissioned executors") { + val clock = new ManualClock(10000L) + val scheduler = setupSchedulerForDecommissionTests(clock) // executor 0 is decommissioned after loosing assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) @@ -1839,14 +1846,29 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) + assert(scheduler.executorsPendingDecommission.isEmpty) + clock.advance(5000) + // executor 1 is decommissioned before loosing assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(2000) scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissioningExecutorsToGc.size === 1) + assert(scheduler.executorsPendingDecommission.size === 1) + clock.advance(2000) + // It hasn't been 60 seconds yet before removal + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + clock.advance(2000) + assert(scheduler.decommissioningExecutorsToGc.size === 1) + assert(scheduler.executorsPendingDecommission.size === 1) + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(61000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissioningExecutorsToGc.isEmpty) + assert(scheduler.executorsPendingDecommission.isEmpty) } /** From 11e4adcb1763fe8c5e29aa25bd3d0a570187f6fc Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Sat, 15 Aug 2020 11:30:55 -0700 Subject: [PATCH 2/4] @holdenk's comments around comments and naming --- .../executor/CoarseGrainedExecutorBackend.scala | 2 ++ .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++++++--- .../spark/deploy/DecommissionWorkerSuite.scala | 2 ++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 10bbdb444b46e..23de8afc61b3e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -294,6 +294,8 @@ private[spark] class CoarseGrainedExecutorBackend( override def run(): Unit = { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s + // This config is internal and only used by unit tests to force an executor + // to hang around for longer when decommissioned. val initialSleepMillis = env.conf.getInt( "spark.executor.decommission.initial.sleep.millis", sleep_time) if (initialSleepMillis > 0) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aface201b88f5..ae0387e09cc6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1847,7 +1847,13 @@ private[spark] class DAGScheduler( fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, maybeEpoch = Some(task.epoch), - ignoreShuffleVersion = isHostDecommissioned) + // shuffleFileLostEpoch is ignored when a host is decommissioned because some + // decommissioned executors on that host might have been removed before this fetch + // failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and + // proceed with unconditional removal of shuffle outputs from all executors on that + // host, including from those that we still haven't confirmed as lost due to heartbeat + // delays. + ignoreShuffleFileLostEpoch = isHostDecommissioned) } } @@ -2014,7 +2020,7 @@ private[spark] class DAGScheduler( fileLost: Boolean, hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None, - ignoreShuffleVersion: Boolean = false): Unit = { + ignoreShuffleFileLostEpoch: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") @@ -2025,7 +2031,7 @@ private[spark] class DAGScheduler( clearCacheLocs() } if (fileLost) { - val remove = if (ignoreShuffleVersion) { + val remove = if (ignoreShuffleFileLostEpoch) { true } else if (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch) { diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index 1ba3c3961f8f6..f2ed7736bb4dd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -236,6 +236,8 @@ class DecommissionWorkerSuite val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) => { val executorId = SparkEnv.get.executorId val context = TaskContext.get() + // Only sleep in the first attempt to create the required window for decommissioning. + // Subsequent attempts don't need to be delayed to speed up the test. if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) { val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 Thread.sleep(sleepTimeSeconds * 1000L) From a8cd1deeb3b389f1c6c73f5dc83ea936819d2233 Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Sat, 15 Aug 2020 12:04:35 -0700 Subject: [PATCH 3/4] Switch to using CacheBuilder instead of home grown expiration --- .../spark/scheduler/TaskSchedulerImpl.scala | 40 ++++++++++--------- .../scheduler/TaskSchedulerImplSuite.scala | 10 ++--- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3bdd1b5baa53a..6501a75508db8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util import java.util.{Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -27,6 +26,9 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} import scala.util.Random +import com.google.common.base.Ticker +import com.google.common.cache.CacheBuilder + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics @@ -137,9 +139,21 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] + // We add executors here when we first get decommission notification for them. Executors can + // continue to run even after being asked to decommission, but they will eventually exit. val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] - // map of second to list of executors to clear form the above map - val decommissioningExecutorsToGc = new util.TreeMap[Long, mutable.ArrayBuffer[String]]() + + // When they exit and we know of that via heartbeat failure, we will add them to this cache. + // This cache is consulted to know if a fetch failure is because a source executor was + // decommissioned. + lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() + .expireAfterWrite( + conf.getLong("spark.decommissioningRememberAfterRemoval.seconds", 60L), TimeUnit.SECONDS) + .ticker(new Ticker{ + override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) + }) + .build[String, ExecutorDecommissionInfo]() + .asMap() def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap @@ -924,13 +938,9 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionInfo(executorId: String) : Option[ExecutorDecommissionInfo] = synchronized { - import scala.collection.JavaConverters._ - // Garbage collect old decommissioning entries - val secondsToGcUptil = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) - val headMap = decommissioningExecutorsToGc.headMap(secondsToGcUptil) - headMap.values().asScala.flatten.foreach(executorsPendingDecommission -= _) - headMap.clear() - executorsPendingDecommission.get(executorId) + executorsPendingDecommission + .get(executorId) + .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) } override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { @@ -1037,14 +1047,8 @@ private[spark] class TaskSchedulerImpl( } - val decomInfo = executorsPendingDecommission.get(executorId) - if (decomInfo.isDefined) { - val rememberSeconds = - conf.getInt("spark.decommissioningRememberAfterRemoval.seconds", 60) - val gcSecond = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + rememberSeconds - decommissioningExecutorsToGc.computeIfAbsent(gcSecond, _ => mutable.ArrayBuffer.empty) += - executorId - } + val decomInfo = executorsPendingDecommission.remove(executorId) + decomInfo.foreach(decommissionedExecutorsRemoved.put(executorId, _)) if (reason != LossReasonPending) { executorIdToHost -= executorId diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 75d890de62038..93b357791e8c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1855,20 +1855,18 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) clock.advance(2000) scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.decommissioningExecutorsToGc.size === 1) - assert(scheduler.executorsPendingDecommission.size === 1) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.executorsPendingDecommission.isEmpty) clock.advance(2000) // It hasn't been 60 seconds yet before removal assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) clock.advance(2000) - assert(scheduler.decommissioningExecutorsToGc.size === 1) - assert(scheduler.executorsPendingDecommission.size === 1) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) clock.advance(61000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) - assert(scheduler.decommissioningExecutorsToGc.isEmpty) - assert(scheduler.executorsPendingDecommission.isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.isEmpty) } /** From df128e507a2c7bd11d33197fca6b4fa10f4e9256 Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Mon, 17 Aug 2020 14:55:55 -0700 Subject: [PATCH 4/4] @cloudfan's comments --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 10 ++++++++++ .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/deploy/DecommissionWorkerSuite.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 23de8afc61b3e..07258f270b458 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -297,7 +297,7 @@ private[spark] class CoarseGrainedExecutorBackend( // This config is internal and only used by unit tests to force an executor // to hang around for longer when decommissioned. val initialSleepMillis = env.conf.getInt( - "spark.executor.decommission.initial.sleep.millis", sleep_time) + "spark.test.executor.decommission.initial.sleep.millis", sleep_time) if (initialSleepMillis > 0) { Thread.sleep(initialSleepMillis) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 200cde0a2d3ed..34acf9f9b30cd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1877,6 +1877,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = + ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") + .doc("Duration for which a decommissioned executor's information will be kept after its" + + "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + + "decommissioning even after the mapper executor has been decommissioned. This allows " + + "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("5m") + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6501a75508db8..2a382380691d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -148,7 +148,7 @@ private[spark] class TaskSchedulerImpl( // decommissioned. lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() .expireAfterWrite( - conf.getLong("spark.decommissioningRememberAfterRemoval.seconds", 60L), TimeUnit.SECONDS) + conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) .ticker(new Ticker{ override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) }) diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index f2ed7736bb4dd..90b77a21ad02e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -207,7 +207,7 @@ class DecommissionWorkerSuite createWorkers(2) sc = createSparkContext( config.Tests.TEST_NO_STAGE_RETRY.key -> "false", - "spark.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, + "spark.test.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true") val executorIdToWorkerInfo = getExecutorToWorkerAssignments val executorToDecom = executorIdToWorkerInfo.keysIterator.next diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 93b357791e8c3..66379d86f9bed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1864,7 +1864,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(2000) assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) - clock.advance(61000) + clock.advance(301000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) assert(scheduler.decommissionedExecutorsRemoved.isEmpty) }