Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 53da40a

Browse files
authored
DCOS-57560: Suppress/revive support for Mesos (#67)
DCOS-57560: Suppress/revive support for Mesos - Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled) - Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled) - Suppresses Mesos offers when max core number is utilized. - Doesn't revive if a task finished successfully - Periodically revives Mesos offers if needed using "spark.mesos.scheduler.revive.interval"
1 parent 29c397c commit 53da40a

File tree

5 files changed

+164
-20
lines changed

5 files changed

+164
-20
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,11 @@ package object config {
140140
"when launching drivers. Default is to accept all offers with sufficient resources.")
141141
.stringConf
142142
.createWithDefault("")
143+
144+
private[spark] val REVIVE_OFFERS_INTERVAL =
145+
ConfigBuilder("spark.mesos.scheduler.revive.interval")
146+
.doc("Amount of milliseconds between periodic revive calls to Mesos, when the job" +
147+
"driver is not suppressing resource offers.")
148+
.timeConf(TimeUnit.MILLISECONDS)
149+
.createWithDefaultString("10s")
143150
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.scheduler.cluster.mesos
2020
import java.io.File
2121
import java.util.{Collections, Date, List => JList}
2222

23+
import org.apache.commons.lang3.time.DateUtils
24+
2325
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
2426
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
2527
import org.apache.mesos.Protos.Environment.Variable
@@ -779,19 +781,20 @@ private[spark] class MesosClusterScheduler(
779781
}
780782

781783
private def getNewRetryState(
782-
retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {
783-
784-
val (retries, waitTimeSec) = retryState
785-
.map { rs => (rs.retries + 1, rs.waitTime) }
786-
.getOrElse{ (1, 1) }
787-
788-
// if a node is draining, the driver should be relaunched without backoff
789-
if (isNodeDraining(status)) {
790-
new MesosClusterRetryState(status, retries, new Date(), waitTimeSec)
791-
} else {
792-
val newWaitTime = waitTimeSec * 2
793-
val nextRetry = new Date(new Date().getTime + newWaitTime * 1000L)
794-
new MesosClusterRetryState(status, retries, nextRetry, newWaitTime)
784+
retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {
785+
val now = new Date()
786+
retryState.map { rs =>
787+
val newRetries = rs.retries + 1
788+
// if a node is draining, the driver should be relaunched without backoff
789+
if (isNodeDraining(status)) {
790+
new MesosClusterRetryState(status, newRetries, now, rs.waitTime)
791+
} else {
792+
new MesosClusterRetryState(
793+
status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)
794+
}
795+
}.getOrElse {
796+
// this is the first retry which should happen without backoff
797+
new MesosClusterRetryState(status, 1, now, 1)
795798
}
796799
}
797800

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
2121
import java.util.{Collections, List => JList, UUID}
22+
import java.util.concurrent.TimeUnit
2223
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2324
import java.util.concurrent.locks.ReentrantLock
2425

@@ -40,7 +41,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
4041
import org.apache.spark.rpc.RpcEndpointAddress
4142
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
4243
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
43-
import org.apache.spark.util.Utils
44+
import org.apache.spark.util.{ThreadUtils, Utils}
4445

4546
/**
4647
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -90,6 +91,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
9091

9192
// Synchronization protected by stateLock
9293
private[this] var stopCalled: Boolean = false
94+
private[this] var offersSuppressed: Boolean = false
9395

9496
private val launcherBackend = new LauncherBackend() {
9597
override protected def conf: SparkConf = sc.conf
@@ -187,6 +189,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
187189
private val schedulerUuid: String = UUID.randomUUID().toString
188190
private val nextExecutorNumber = new AtomicLong()
189191

192+
private val reviveOffersExecutorService =
193+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread")
194+
private val reviveIntervalMs = conf.get(REVIVE_OFFERS_INTERVAL)
195+
190196
override def start() {
191197
super.start()
192198

@@ -218,6 +224,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
218224

219225
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
220226
startScheduler(driver)
227+
228+
// Periodic check if there is a need to revive mesos offers
229+
reviveOffersExecutorService.scheduleAtFixedRate(new Runnable {
230+
override def run(): Unit = {
231+
stateLock.synchronized {
232+
if (!offersSuppressed) {
233+
logDebug("scheduled mesos offers revive")
234+
schedulerDriver.reviveOffers
235+
}
236+
}
237+
}
238+
}, reviveIntervalMs, reviveIntervalMs, TimeUnit.MILLISECONDS)
221239
}
222240

223241
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
@@ -360,6 +378,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
360378
override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
361379
stateLock.synchronized {
362380
metricsSource.recordOffers(offers.size)
381+
logInfo(s"Received ${offers.size} resource offers.")
382+
363383
if (stopCalled) {
364384
logDebug("Ignoring offers during shutdown")
365385
// Driver should simply return a stopped status on race
@@ -370,9 +390,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
370390
}
371391

372392
if (numExecutors >= executorLimit) {
373-
logDebug("Executor limit reached. numExecutors: " + numExecutors +
374-
" executorLimit: " + executorLimit)
375393
offers.asScala.map(_.getId).foreach(d.declineOffer)
394+
logInfo("Executor limit reached. numExecutors: " + numExecutors +
395+
" executorLimit: " + executorLimit + " . Suppressing further offers.")
396+
suppressOffers(Option(d))
376397
launchingExecutors = false
377398
return
378399
} else {
@@ -382,8 +403,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
382403
}
383404
}
384405

385-
logDebug(s"Received ${offers.size} resource offers.")
386-
387406
val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
388407
val offerAttributes = toAttributeMap(offer.getAttributesList)
389408
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
@@ -416,6 +435,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
416435
private def handleMatchedOffers(
417436
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
418437
val tasks = buildMesosTasks(offers)
438+
var suppressionRequired = false
419439
for (offer <- offers) {
420440
val offerAttributes = toAttributeMap(offer.getAttributesList)
421441
val offerMem = getResource(offer.getResourcesList, "mem")
@@ -455,6 +475,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
455475
Collections.singleton(offer.getId),
456476
offerTasks.asJava)
457477
} else if (totalCoresAcquired >= maxCores) {
478+
suppressionRequired = true
458479
// Reject an offer for a configurable amount of time to avoid starving other frameworks
459480
metricsSource.recordDeclineFinished
460481
declineOffer(driver,
@@ -469,6 +490,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
469490
Some("Offer was declined due to unmet task launch constraints."))
470491
}
471492
}
493+
494+
if (suppressionRequired) {
495+
logInfo("Max core number is reached. Suppressing further offers.")
496+
suppressOffers(Option.empty)
497+
}
472498
}
473499

474500
private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = {
@@ -696,18 +722,36 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
696722
}
697723
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
698724
// In case we'd rejected everything before but have now lost a node
699-
metricsSource.recordRevive
700-
d.reviveOffers
725+
if (state != TaskState.FINISHED) {
726+
logInfo("Reviving offers due to a failed executor task.")
727+
reviveOffers(Option(d))
728+
}
701729
}
702730
}
703731
}
704732

733+
private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
734+
stateLock.synchronized {
735+
metricsSource.recordRevive
736+
offersSuppressed = false
737+
driver.getOrElse(schedulerDriver).reviveOffers
738+
}
739+
}
740+
741+
private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
742+
stateLock.synchronized {
743+
offersSuppressed = true
744+
driver.getOrElse(schedulerDriver).suppressOffers
745+
}
746+
}
747+
705748
override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
706749
logError(s"Mesos error: $message")
707750
scheduler.error(message)
708751
}
709752

710753
override def stop() {
754+
reviveOffersExecutorService.shutdownNow()
711755
stopSchedulerBackend()
712756
launcherBackend.setState(SparkAppHandle.State.FINISHED)
713757
launcherBackend.close()
@@ -791,7 +835,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
791835
// We don't truly know if we can fulfill the full amount of executors
792836
// since at coarse grain it depends on the amount of slaves available.
793837
logInfo("Capping the total amount of executors to " + requestedTotal)
838+
val reviveNeeded = executorLimit < requestedTotal
794839
executorLimitOption = Some(requestedTotal)
840+
if (reviveNeeded && schedulerDriver != null) {
841+
logInfo("The executor limit increased. Reviving offers.")
842+
reviveOffers(Option.empty)
843+
}
795844
// Update the locality wait start time to continue trying for locality.
796845
localityWaitStartTime = System.currentTimeMillis()
797846
true

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
593593

594594
// Offer new resource to retry driver on a new agent
595595
val agent2 = SlaveID.newBuilder().setValue("s2").build()
596+
Thread.sleep(1500)
596597
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
597598
taskStatus = TaskStatus.newBuilder()
598599
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,90 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
339339
assert(taskInfos.length == 2)
340340
}
341341

342+
test("scheduler backend suppresses mesos offers when max core count reached") {
343+
val executorCores = 2
344+
val executors = 2
345+
setBackend(Map(
346+
"spark.executor.cores" -> executorCores.toString(),
347+
"spark.cores.max" -> (executorCores * executors).toString()))
348+
349+
val executorMemory = backend.executorMemory(sc)
350+
offerResources(List(
351+
Resources(executorMemory, executorCores),
352+
Resources(executorMemory, executorCores),
353+
Resources(executorMemory, executorCores)))
354+
355+
assert(backend.getTaskCount() == 2)
356+
verify(driver, times(1)).suppressOffers()
357+
358+
// Finishing at least one task should trigger a revive
359+
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
360+
backend.statusUpdate(driver, status)
361+
verify(driver, times(1)).reviveOffers()
362+
363+
offerResources(List(
364+
Resources(executorMemory, executorCores)))
365+
verify(driver, times(2)).suppressOffers()
366+
}
367+
368+
test("scheduler backend suppresses mesos offers when the executor cap is reached") {
369+
val executorCores = 1
370+
val executors = 10
371+
setBackend(Map(
372+
"spark.executor.cores" -> executorCores.toString(),
373+
"spark.cores.max" -> (executorCores * executors).toString()))
374+
375+
val executorMemory = backend.executorMemory(sc)
376+
offerResources(List(
377+
Resources(executorMemory, executorCores),
378+
Resources(executorMemory, executorCores),
379+
Resources(executorMemory, executorCores)))
380+
381+
assert(backend.getTaskCount() == 3)
382+
verify(driver, times(0)).suppressOffers()
383+
384+
assert(backend.doRequestTotalExecutors(3).futureValue)
385+
offerResources(List(
386+
Resources(executorMemory, executorCores)))
387+
verify(driver, times(1)).suppressOffers()
388+
389+
assert(backend.doRequestTotalExecutors(2).futureValue)
390+
verify(driver, times(0)).reviveOffers()
391+
392+
// Finishing at least one task should trigger a revive
393+
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
394+
backend.statusUpdate(driver, status)
395+
verify(driver, times(1)).reviveOffers()
396+
397+
offerResources(List(
398+
Resources(executorMemory, executorCores)))
399+
verify(driver, times(2)).suppressOffers()
400+
}
401+
402+
test("scheduler periodically revives mesos offers if needed") {
403+
val executorCores = 1
404+
val executors = 3
405+
setBackend(Map(
406+
"spark.mesos.scheduler.revive.interval" -> "1s",
407+
"spark.executor.cores" -> executorCores.toString(),
408+
"spark.cores.max" -> (executorCores * executors).toString()))
409+
410+
val executorMemory = backend.executorMemory(sc)
411+
offerResources(List(
412+
Resources(executorMemory, executorCores)))
413+
414+
assert(backend.getTaskCount() == 1)
415+
verify(driver, times(0)).suppressOffers()
416+
417+
// Verify that offers are revived every second
418+
Thread.sleep(1500)
419+
verify(driver, times(1)).reviveOffers()
420+
421+
Thread.sleep(1000)
422+
verify(driver, times(2)).reviveOffers()
423+
424+
}
425+
342426
test("mesos doesn't register twice with the same shuffle service") {
343427
setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true"))
344428
val (mem, cpu) = (backend.executorMemory(sc), 4)

0 commit comments

Comments
 (0)