Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,11 @@ package object config {
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")

private[spark] val REVIVE_OFFERS_INTERVAL =
ConfigBuilder("spark.mesos.scheduler.revive.interval")
.doc("Amount of milliseconds between periodic revive calls to Mesos, when the job" +
"driver is not suppressing resource offers.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{Collections, Date, List => JList}

import org.apache.commons.lang3.time.DateUtils

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Environment.Variable
Expand Down Expand Up @@ -779,19 +781,20 @@ private[spark] class MesosClusterScheduler(
}

private def getNewRetryState(
retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {

val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, rs.waitTime) }
.getOrElse{ (1, 1) }

// if a node is draining, the driver should be relaunched without backoff
if (isNodeDraining(status)) {
new MesosClusterRetryState(status, retries, new Date(), waitTimeSec)
} else {
val newWaitTime = waitTimeSec * 2
val nextRetry = new Date(new Date().getTime + newWaitTime * 1000L)
new MesosClusterRetryState(status, retries, nextRetry, newWaitTime)
retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {
val now = new Date()
retryState.map { rs =>
val newRetries = rs.retries + 1
// if a node is draining, the driver should be relaunched without backoff
if (isNodeDraining(status)) {
new MesosClusterRetryState(status, newRetries, now, rs.waitTime)
} else {
new MesosClusterRetryState(
status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)
}
}.getOrElse {
// this is the first retry which should happen without backoff
new MesosClusterRetryState(status, 1, now, 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList, UUID}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

Expand All @@ -40,7 +41,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -90,6 +91,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

// Synchronization protected by stateLock
private[this] var stopCalled: Boolean = false
private[this] var offersSuppressed: Boolean = false

private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
Expand Down Expand Up @@ -187,6 +189,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val schedulerUuid: String = UUID.randomUUID().toString
private val nextExecutorNumber = new AtomicLong()

private val reviveOffersExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread")
private val reviveIntervalMs = conf.get(REVIVE_OFFERS_INTERVAL)

override def start() {
super.start()

Expand Down Expand Up @@ -218,6 +224,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
startScheduler(driver)

// Periodic check if there is a need to revive mesos offers
reviveOffersExecutorService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
stateLock.synchronized {
if (!offersSuppressed) {
logDebug("scheduled mesos offers revive")
schedulerDriver.reviveOffers

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log line here. Would help debugging in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
}
}, reviveIntervalMs, reviveIntervalMs, TimeUnit.MILLISECONDS)
}

def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
Expand Down Expand Up @@ -360,6 +378,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
metricsSource.recordOffers(offers.size)
logInfo(s"Received ${offers.size} resource offers.")

if (stopCalled) {
logDebug("Ignoring offers during shutdown")
// Driver should simply return a stopped status on race
Expand All @@ -370,9 +390,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

if (numExecutors >= executorLimit) {
logDebug("Executor limit reached. numExecutors: " + numExecutors +
" executorLimit: " + executorLimit)
offers.asScala.map(_.getId).foreach(d.declineOffer)
logInfo("Executor limit reached. numExecutors: " + numExecutors +
" executorLimit: " + executorLimit + " . Suppressing further offers.")
suppressOffers(Option(d))
launchingExecutors = false
return
} else {
Expand All @@ -382,8 +403,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
}

logDebug(s"Received ${offers.size} resource offers.")

val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
val offerAttributes = toAttributeMap(offer.getAttributesList)
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
Expand Down Expand Up @@ -416,6 +435,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private def handleMatchedOffers(
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
var suppressionRequired = false
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val offerMem = getResource(offer.getResourcesList, "mem")
Expand Down Expand Up @@ -455,6 +475,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
suppressionRequired = true
// Reject an offer for a configurable amount of time to avoid starving other frameworks
metricsSource.recordDeclineFinished
declineOffer(driver,
Expand All @@ -469,6 +490,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
Some("Offer was declined due to unmet task launch constraints."))
}
}

if (suppressionRequired) {
logInfo("Max core number is reached. Suppressing further offers.")
suppressOffers(Option.empty)
}
}

private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = {
Expand Down Expand Up @@ -696,18 +722,36 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
metricsSource.recordRevive
d.reviveOffers
if (state != TaskState.FINISHED) {
logInfo("Reviving offers due to a failed executor task.")
reviveOffers(Option(d))
}
}
}
}

private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
stateLock.synchronized {
metricsSource.recordRevive
offersSuppressed = false
driver.getOrElse(schedulerDriver).reviveOffers
}
}

private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
stateLock.synchronized {
offersSuppressed = true
driver.getOrElse(schedulerDriver).suppressOffers
}
}

override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
logError(s"Mesos error: $message")
scheduler.error(message)
}

override def stop() {
reviveOffersExecutorService.shutdownNow()
stopSchedulerBackend()
launcherBackend.setState(SparkAppHandle.State.FINISHED)
launcherBackend.close()
Expand Down Expand Up @@ -791,7 +835,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
val reviveNeeded = executorLimit < requestedTotal
executorLimitOption = Some(requestedTotal)
if (reviveNeeded && schedulerDriver != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is safe to rely on doRequestTotalExecutors invocations to revive offers. In addition to this check, I'd suggest running another check in a background thread and revive offers if the conditions apply.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all cases are covered actually. There are two conditions when offers are suppressed:

  1. numExecutors >= executorLimit

    • numExecutors decreases only when executorTerminated() is called, which happens only statusUpdate() right before reviving offers

    • executorLimit changes only in doRequestTotalExecutors()

  2. totalCoresAcquired >= maxCores

    • Similarly to numExecturos, totalCoresAcquired decreases only in statusUpdate() when a task has finished, which is always followed by a revive.

    • maxCores is a pre-configured value and doesn't change

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above regarding revives potentially being dropped. So we need to hook into some kind of periodic thread to check if there are executors/tasks to launch and do a revive to be safe.

logInfo("The executor limit increased. Reviving offers.")
reviveOffers(Option.empty)
}
// Update the locality wait start time to continue trying for locality.
localityWaitStartTime = System.currentTimeMillis()
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

// Offer new resource to retry driver on a new agent
val agent2 = SlaveID.newBuilder().setValue("s2").build()
Thread.sleep(1500)
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,90 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(taskInfos.length == 2)
}

test("scheduler backend suppresses mesos offers when max core count reached") {
val executorCores = 2
val executors = 2
setBackend(Map(
"spark.executor.cores" -> executorCores.toString(),
"spark.cores.max" -> (executorCores * executors).toString()))

val executorMemory = backend.executorMemory(sc)
offerResources(List(
Resources(executorMemory, executorCores),
Resources(executorMemory, executorCores),
Resources(executorMemory, executorCores)))

assert(backend.getTaskCount() == 2)
verify(driver, times(1)).suppressOffers()

// Finishing at least one task should trigger a revive
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
verify(driver, times(1)).reviveOffers()

offerResources(List(
Resources(executorMemory, executorCores)))
verify(driver, times(2)).suppressOffers()
}

test("scheduler backend suppresses mesos offers when the executor cap is reached") {
val executorCores = 1
val executors = 10
setBackend(Map(
"spark.executor.cores" -> executorCores.toString(),
"spark.cores.max" -> (executorCores * executors).toString()))

val executorMemory = backend.executorMemory(sc)
offerResources(List(
Resources(executorMemory, executorCores),
Resources(executorMemory, executorCores),
Resources(executorMemory, executorCores)))

assert(backend.getTaskCount() == 3)
verify(driver, times(0)).suppressOffers()

assert(backend.doRequestTotalExecutors(3).futureValue)
offerResources(List(
Resources(executorMemory, executorCores)))
verify(driver, times(1)).suppressOffers()

assert(backend.doRequestTotalExecutors(2).futureValue)
verify(driver, times(0)).reviveOffers()

// Finishing at least one task should trigger a revive
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
verify(driver, times(1)).reviveOffers()

offerResources(List(
Resources(executorMemory, executorCores)))
verify(driver, times(2)).suppressOffers()
}

test("scheduler periodically revives mesos offers if needed") {
val executorCores = 1
val executors = 3
setBackend(Map(
"spark.mesos.scheduler.revive.interval" -> "1s",
"spark.executor.cores" -> executorCores.toString(),
"spark.cores.max" -> (executorCores * executors).toString()))

val executorMemory = backend.executorMemory(sc)
offerResources(List(
Resources(executorMemory, executorCores)))

assert(backend.getTaskCount() == 1)
verify(driver, times(0)).suppressOffers()

// Verify that offers are revived every second
Thread.sleep(1500)
verify(driver, times(1)).reviveOffers()

Thread.sleep(1000)
verify(driver, times(2)).reviveOffers()

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great to have a test imitating executor failure, when Driver needs to first revive offers, and then suppress when the total number of cores/executors reached.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Added a check for that in both tests.

test("mesos doesn't register twice with the same shuffle service") {
setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
Expand Down