diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index ab71e551a797..f8d3ad4f9566 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -140,4 +140,11 @@ package object config { "when launching drivers. Default is to accept all offers with sufficient resources.") .stringConf .createWithDefault("") + + private[spark] val REVIVE_OFFERS_INTERVAL = + ConfigBuilder("spark.mesos.scheduler.revive.interval") + .doc("Amount of milliseconds between periodic revive calls to Mesos, when the job" + + "driver is not suppressing resource offers.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index b37e53b135eb..66a9b4034e9c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -20,6 +20,8 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, Date, List => JList} +import org.apache.commons.lang3.time.DateUtils + import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Environment.Variable @@ -779,19 +781,20 @@ private[spark] class MesosClusterScheduler( } private def getNewRetryState( - retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = { - - val (retries, waitTimeSec) = retryState - .map { rs => (rs.retries + 1, rs.waitTime) } - .getOrElse{ (1, 1) } - - // if a node is draining, the driver should be relaunched without backoff - if (isNodeDraining(status)) { - new MesosClusterRetryState(status, retries, new Date(), waitTimeSec) - } else { - val newWaitTime = waitTimeSec * 2 - val nextRetry = new Date(new Date().getTime + newWaitTime * 1000L) - new MesosClusterRetryState(status, retries, nextRetry, newWaitTime) + retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = { + val now = new Date() + retryState.map { rs => + val newRetries = rs.retries + 1 + // if a node is draining, the driver should be relaunched without backoff + if (isNodeDraining(status)) { + new MesosClusterRetryState(status, newRetries, now, rs.waitTime) + } else { + new MesosClusterRetryState( + status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2) + } + }.getOrElse { + // this is the first retry which should happen without backoff + new MesosClusterRetryState(status, 1, now, 1) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 09156c5bba4f..2b50c007ebdb 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, List => JList, UUID} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock @@ -40,7 +41,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -90,6 +91,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Synchronization protected by stateLock private[this] var stopCalled: Boolean = false + private[this] var offersSuppressed: Boolean = false private val launcherBackend = new LauncherBackend() { override protected def conf: SparkConf = sc.conf @@ -187,6 +189,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val schedulerUuid: String = UUID.randomUUID().toString private val nextExecutorNumber = new AtomicLong() + private val reviveOffersExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread") + private val reviveIntervalMs = conf.get(REVIVE_OFFERS_INTERVAL) + override def start() { super.start() @@ -218,6 +224,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launcherBackend.setState(SparkAppHandle.State.SUBMITTED) startScheduler(driver) + + // Periodic check if there is a need to revive mesos offers + reviveOffersExecutorService.scheduleAtFixedRate(new Runnable { + override def run(): Unit = { + stateLock.synchronized { + if (!offersSuppressed) { + logDebug("scheduled mesos offers revive") + schedulerDriver.reviveOffers + } + } + } + }, reviveIntervalMs, reviveIntervalMs, TimeUnit.MILLISECONDS) } def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { @@ -360,6 +378,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { metricsSource.recordOffers(offers.size) + logInfo(s"Received ${offers.size} resource offers.") + if (stopCalled) { logDebug("Ignoring offers during shutdown") // Driver should simply return a stopped status on race @@ -370,9 +390,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } if (numExecutors >= executorLimit) { - logDebug("Executor limit reached. numExecutors: " + numExecutors + - " executorLimit: " + executorLimit) offers.asScala.map(_.getId).foreach(d.declineOffer) + logInfo("Executor limit reached. numExecutors: " + numExecutors + + " executorLimit: " + executorLimit + " . Suppressing further offers.") + suppressOffers(Option(d)) launchingExecutors = false return } else { @@ -382,8 +403,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } - logDebug(s"Received ${offers.size} resource offers.") - val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) @@ -416,6 +435,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def handleMatchedOffers( driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { val tasks = buildMesosTasks(offers) + var suppressionRequired = false for (offer <- offers) { val offerAttributes = toAttributeMap(offer.getAttributesList) val offerMem = getResource(offer.getResourcesList, "mem") @@ -455,6 +475,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( Collections.singleton(offer.getId), offerTasks.asJava) } else if (totalCoresAcquired >= maxCores) { + suppressionRequired = true // Reject an offer for a configurable amount of time to avoid starving other frameworks metricsSource.recordDeclineFinished declineOffer(driver, @@ -469,6 +490,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( Some("Offer was declined due to unmet task launch constraints.")) } } + + if (suppressionRequired) { + logInfo("Max core number is reached. Suppressing further offers.") + suppressOffers(Option.empty) + } } private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = { @@ -696,18 +722,36 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") // In case we'd rejected everything before but have now lost a node - metricsSource.recordRevive - d.reviveOffers + if (state != TaskState.FINISHED) { + logInfo("Reviving offers due to a failed executor task.") + reviveOffers(Option(d)) + } } } } + private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { + stateLock.synchronized { + metricsSource.recordRevive + offersSuppressed = false + driver.getOrElse(schedulerDriver).reviveOffers + } + } + + private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { + stateLock.synchronized { + offersSuppressed = true + driver.getOrElse(schedulerDriver).suppressOffers + } + } + override def error(d: org.apache.mesos.SchedulerDriver, message: String) { logError(s"Mesos error: $message") scheduler.error(message) } override def stop() { + reviveOffersExecutorService.shutdownNow() stopSchedulerBackend() launcherBackend.setState(SparkAppHandle.State.FINISHED) launcherBackend.close() @@ -791,7 +835,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // We don't truly know if we can fulfill the full amount of executors // since at coarse grain it depends on the amount of slaves available. logInfo("Capping the total amount of executors to " + requestedTotal) + val reviveNeeded = executorLimit < requestedTotal executorLimitOption = Some(requestedTotal) + if (reviveNeeded && schedulerDriver != null) { + logInfo("The executor limit increased. Reviving offers.") + reviveOffers(Option.empty) + } // Update the locality wait start time to continue trying for locality. localityWaitStartTime = System.currentTimeMillis() true diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 5e65e40a35b2..66d785347311 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -593,6 +593,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi // Offer new resource to retry driver on a new agent val agent2 = SlaveID.newBuilder().setValue("s2").build() + Thread.sleep(1500) scheduler.resourceOffers(driver, Collections.singletonList(offers(1))) taskStatus = TaskStatus.newBuilder() .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f2a5f3beabed..e87df5368b22 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -339,6 +339,90 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(taskInfos.length == 2) } + test("scheduler backend suppresses mesos offers when max core count reached") { + val executorCores = 2 + val executors = 2 + setBackend(Map( + "spark.executor.cores" -> executorCores.toString(), + "spark.cores.max" -> (executorCores * executors).toString())) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + Resources(executorMemory, executorCores), + Resources(executorMemory, executorCores), + Resources(executorMemory, executorCores))) + + assert(backend.getTaskCount() == 2) + verify(driver, times(1)).suppressOffers() + + // Finishing at least one task should trigger a revive + val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) + backend.statusUpdate(driver, status) + verify(driver, times(1)).reviveOffers() + + offerResources(List( + Resources(executorMemory, executorCores))) + verify(driver, times(2)).suppressOffers() + } + + test("scheduler backend suppresses mesos offers when the executor cap is reached") { + val executorCores = 1 + val executors = 10 + setBackend(Map( + "spark.executor.cores" -> executorCores.toString(), + "spark.cores.max" -> (executorCores * executors).toString())) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + Resources(executorMemory, executorCores), + Resources(executorMemory, executorCores), + Resources(executorMemory, executorCores))) + + assert(backend.getTaskCount() == 3) + verify(driver, times(0)).suppressOffers() + + assert(backend.doRequestTotalExecutors(3).futureValue) + offerResources(List( + Resources(executorMemory, executorCores))) + verify(driver, times(1)).suppressOffers() + + assert(backend.doRequestTotalExecutors(2).futureValue) + verify(driver, times(0)).reviveOffers() + + // Finishing at least one task should trigger a revive + val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) + backend.statusUpdate(driver, status) + verify(driver, times(1)).reviveOffers() + + offerResources(List( + Resources(executorMemory, executorCores))) + verify(driver, times(2)).suppressOffers() + } + + test("scheduler periodically revives mesos offers if needed") { + val executorCores = 1 + val executors = 3 + setBackend(Map( + "spark.mesos.scheduler.revive.interval" -> "1s", + "spark.executor.cores" -> executorCores.toString(), + "spark.cores.max" -> (executorCores * executors).toString())) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + Resources(executorMemory, executorCores))) + + assert(backend.getTaskCount() == 1) + verify(driver, times(0)).suppressOffers() + + // Verify that offers are revived every second + Thread.sleep(1500) + verify(driver, times(1)).reviveOffers() + + Thread.sleep(1000) + verify(driver, times(2)).reviveOffers() + + } + test("mesos doesn't register twice with the same shuffle service") { setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true")) val (mem, cpu) = (backend.executorMemory(sc), 4)