-
Notifications
You must be signed in to change notification settings - Fork 7
DCOS-57560: Suppress/revive support for Mesos #67
Changes from all commits
5150507
fd5901e
b5d4a7d
a131c7f
ef57f02
b73632b
871a46d
85a6d17
217e275
9c5c8ba
12536a1
4243a82
c211c8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos | |
|
|
||
| import java.io.File | ||
| import java.util.{Collections, List => JList, UUID} | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} | ||
| import java.util.concurrent.locks.ReentrantLock | ||
|
|
||
|
|
@@ -40,7 +41,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient | |
| import org.apache.spark.rpc.RpcEndpointAddress | ||
| import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
| /** | ||
| * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds | ||
|
|
@@ -90,6 +91,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
|
|
||
| // Synchronization protected by stateLock | ||
| private[this] var stopCalled: Boolean = false | ||
| private[this] var offersSuppressed: Boolean = false | ||
|
|
||
| private val launcherBackend = new LauncherBackend() { | ||
| override protected def conf: SparkConf = sc.conf | ||
|
|
@@ -187,6 +189,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| private val schedulerUuid: String = UUID.randomUUID().toString | ||
| private val nextExecutorNumber = new AtomicLong() | ||
|
|
||
| private val reviveOffersExecutorService = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread") | ||
| private val reviveIntervalMs = conf.get(REVIVE_OFFERS_INTERVAL) | ||
|
|
||
| override def start() { | ||
| super.start() | ||
|
|
||
|
|
@@ -218,6 +224,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
|
|
||
| launcherBackend.setState(SparkAppHandle.State.SUBMITTED) | ||
| startScheduler(driver) | ||
|
|
||
| // Periodic check if there is a need to revive mesos offers | ||
| reviveOffersExecutorService.scheduleAtFixedRate(new Runnable { | ||
| override def run(): Unit = { | ||
| stateLock.synchronized { | ||
| if (!offersSuppressed) { | ||
| logDebug("scheduled mesos offers revive") | ||
| schedulerDriver.reviveOffers | ||
| } | ||
| } | ||
| } | ||
| }, reviveIntervalMs, reviveIntervalMs, TimeUnit.MILLISECONDS) | ||
| } | ||
|
|
||
| def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { | ||
|
|
@@ -360,6 +378,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { | ||
| stateLock.synchronized { | ||
| metricsSource.recordOffers(offers.size) | ||
| logInfo(s"Received ${offers.size} resource offers.") | ||
|
|
||
| if (stopCalled) { | ||
| logDebug("Ignoring offers during shutdown") | ||
| // Driver should simply return a stopped status on race | ||
|
|
@@ -370,9 +390,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| } | ||
|
|
||
| if (numExecutors >= executorLimit) { | ||
| logDebug("Executor limit reached. numExecutors: " + numExecutors + | ||
| " executorLimit: " + executorLimit) | ||
| offers.asScala.map(_.getId).foreach(d.declineOffer) | ||
| logInfo("Executor limit reached. numExecutors: " + numExecutors + | ||
| " executorLimit: " + executorLimit + " . Suppressing further offers.") | ||
| suppressOffers(Option(d)) | ||
| launchingExecutors = false | ||
| return | ||
| } else { | ||
|
|
@@ -382,8 +403,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| } | ||
| } | ||
|
|
||
| logDebug(s"Received ${offers.size} resource offers.") | ||
|
|
||
| val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => | ||
| val offerAttributes = toAttributeMap(offer.getAttributesList) | ||
| matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) | ||
|
|
@@ -416,6 +435,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| private def handleMatchedOffers( | ||
| driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { | ||
| val tasks = buildMesosTasks(offers) | ||
| var suppressionRequired = false | ||
| for (offer <- offers) { | ||
| val offerAttributes = toAttributeMap(offer.getAttributesList) | ||
| val offerMem = getResource(offer.getResourcesList, "mem") | ||
|
|
@@ -455,6 +475,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| Collections.singleton(offer.getId), | ||
| offerTasks.asJava) | ||
| } else if (totalCoresAcquired >= maxCores) { | ||
| suppressionRequired = true | ||
| // Reject an offer for a configurable amount of time to avoid starving other frameworks | ||
| metricsSource.recordDeclineFinished | ||
| declineOffer(driver, | ||
|
|
@@ -469,6 +490,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| Some("Offer was declined due to unmet task launch constraints.")) | ||
| } | ||
| } | ||
|
|
||
| if (suppressionRequired) { | ||
| logInfo("Max core number is reached. Suppressing further offers.") | ||
| suppressOffers(Option.empty) | ||
| } | ||
| } | ||
|
|
||
| private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = { | ||
|
|
@@ -696,18 +722,36 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| } | ||
| executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") | ||
| // In case we'd rejected everything before but have now lost a node | ||
| metricsSource.recordRevive | ||
| d.reviveOffers | ||
| if (state != TaskState.FINISHED) { | ||
| logInfo("Reviving offers due to a failed executor task.") | ||
| reviveOffers(Option(d)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { | ||
| stateLock.synchronized { | ||
| metricsSource.recordRevive | ||
| offersSuppressed = false | ||
| driver.getOrElse(schedulerDriver).reviveOffers | ||
| } | ||
| } | ||
|
|
||
| private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { | ||
| stateLock.synchronized { | ||
| offersSuppressed = true | ||
| driver.getOrElse(schedulerDriver).suppressOffers | ||
| } | ||
| } | ||
|
|
||
| override def error(d: org.apache.mesos.SchedulerDriver, message: String) { | ||
| logError(s"Mesos error: $message") | ||
| scheduler.error(message) | ||
| } | ||
|
|
||
| override def stop() { | ||
| reviveOffersExecutorService.shutdownNow() | ||
| stopSchedulerBackend() | ||
| launcherBackend.setState(SparkAppHandle.State.FINISHED) | ||
| launcherBackend.close() | ||
|
|
@@ -791,7 +835,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| // We don't truly know if we can fulfill the full amount of executors | ||
| // since at coarse grain it depends on the amount of slaves available. | ||
| logInfo("Capping the total amount of executors to " + requestedTotal) | ||
| val reviveNeeded = executorLimit < requestedTotal | ||
| executorLimitOption = Some(requestedTotal) | ||
| if (reviveNeeded && schedulerDriver != null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if this is safe to rely on
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think all cases are covered actually. There are two conditions when offers are suppressed:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment above regarding |
||
| logInfo("The executor limit increased. Reviving offers.") | ||
| reviveOffers(Option.empty) | ||
| } | ||
| // Update the locality wait start time to continue trying for locality. | ||
| localityWaitStartTime = System.currentTimeMillis() | ||
| true | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -339,6 +339,90 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite | |
| assert(taskInfos.length == 2) | ||
| } | ||
|
|
||
| test("scheduler backend suppresses mesos offers when max core count reached") { | ||
| val executorCores = 2 | ||
| val executors = 2 | ||
| setBackend(Map( | ||
| "spark.executor.cores" -> executorCores.toString(), | ||
| "spark.cores.max" -> (executorCores * executors).toString())) | ||
|
|
||
| val executorMemory = backend.executorMemory(sc) | ||
| offerResources(List( | ||
| Resources(executorMemory, executorCores), | ||
| Resources(executorMemory, executorCores), | ||
| Resources(executorMemory, executorCores))) | ||
|
|
||
| assert(backend.getTaskCount() == 2) | ||
| verify(driver, times(1)).suppressOffers() | ||
|
|
||
| // Finishing at least one task should trigger a revive | ||
| val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) | ||
| backend.statusUpdate(driver, status) | ||
| verify(driver, times(1)).reviveOffers() | ||
|
|
||
| offerResources(List( | ||
| Resources(executorMemory, executorCores))) | ||
| verify(driver, times(2)).suppressOffers() | ||
| } | ||
|
|
||
| test("scheduler backend suppresses mesos offers when the executor cap is reached") { | ||
| val executorCores = 1 | ||
| val executors = 10 | ||
| setBackend(Map( | ||
| "spark.executor.cores" -> executorCores.toString(), | ||
| "spark.cores.max" -> (executorCores * executors).toString())) | ||
|
|
||
| val executorMemory = backend.executorMemory(sc) | ||
| offerResources(List( | ||
| Resources(executorMemory, executorCores), | ||
| Resources(executorMemory, executorCores), | ||
| Resources(executorMemory, executorCores))) | ||
|
|
||
| assert(backend.getTaskCount() == 3) | ||
| verify(driver, times(0)).suppressOffers() | ||
|
|
||
| assert(backend.doRequestTotalExecutors(3).futureValue) | ||
| offerResources(List( | ||
| Resources(executorMemory, executorCores))) | ||
| verify(driver, times(1)).suppressOffers() | ||
|
|
||
| assert(backend.doRequestTotalExecutors(2).futureValue) | ||
| verify(driver, times(0)).reviveOffers() | ||
|
|
||
| // Finishing at least one task should trigger a revive | ||
| val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) | ||
| backend.statusUpdate(driver, status) | ||
| verify(driver, times(1)).reviveOffers() | ||
|
|
||
| offerResources(List( | ||
| Resources(executorMemory, executorCores))) | ||
| verify(driver, times(2)).suppressOffers() | ||
| } | ||
|
|
||
| test("scheduler periodically revives mesos offers if needed") { | ||
| val executorCores = 1 | ||
| val executors = 3 | ||
| setBackend(Map( | ||
| "spark.mesos.scheduler.revive.interval" -> "1s", | ||
| "spark.executor.cores" -> executorCores.toString(), | ||
| "spark.cores.max" -> (executorCores * executors).toString())) | ||
|
|
||
| val executorMemory = backend.executorMemory(sc) | ||
| offerResources(List( | ||
| Resources(executorMemory, executorCores))) | ||
|
|
||
| assert(backend.getTaskCount() == 1) | ||
| verify(driver, times(0)).suppressOffers() | ||
|
|
||
| // Verify that offers are revived every second | ||
| Thread.sleep(1500) | ||
| verify(driver, times(1)).reviveOffers() | ||
|
|
||
| Thread.sleep(1000) | ||
| verify(driver, times(2)).reviveOffers() | ||
|
|
||
| } | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be great to have a test imitating executor failure, when Driver needs to first revive offers, and then suppress when the total number of cores/executors reached.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Added a check for that in both tests. |
||
| test("mesos doesn't register twice with the same shuffle service") { | ||
| setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true")) | ||
| val (mem, cpu) = (backend.executorMemory(sc), 4) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a log line here. Would help debugging in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done