Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Conversation

@rpalaznik
Copy link

@rpalaznik rpalaznik commented Dec 18, 2019

What changes were proposed in this pull request?

This PR adds support in Spark drivers for suppressing Mesos resource offers when max number of executors started. Currently the offers are declined with configurable delay (2 minutes by default).

  • Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled)
  • Revives Mesos offers when the executor cap is increased (dynamic allocation enabled)
  • Suppresses Mesos offers when max core number is utilized.

How was this patch tested?

  • Manually in a DC/OS cluster
  • Unit tests

Integration tests will be added to mesosphere/spark-build later in a separate PR

Release notes

Added support for resource offer suppression when run in a Mesos cluster

- Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled)
- Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled)
- Suppresses Mesos offers when max core number is utilized.
Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @rpalaznik. Overall LGTM, please check my comments.

logInfo("Capping the total amount of executors to " + requestedTotal)
val reviveNeeded = executorLimit < requestedTotal
executorLimitOption = Some(requestedTotal)
if (reviveNeeded && schedulerDriver != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is safe to rely on doRequestTotalExecutors invocations to revive offers. In addition to this check, I'd suggest running another check in a background thread and revive offers if the conditions apply.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all cases are covered actually. There are two conditions when offers are suppressed:

  1. numExecutors >= executorLimit

    • numExecutors decreases only when executorTerminated() is called, which happens only statusUpdate() right before reviving offers

    • executorLimit changes only in doRequestTotalExecutors()

  2. totalCoresAcquired >= maxCores

    • Similarly to numExecturos, totalCoresAcquired decreases only in statusUpdate() when a task has finished, which is always followed by a revive.

    • maxCores is a pre-configured value and doesn't change

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above regarding revives potentially being dropped. So we need to hook into some kind of periodic thread to check if there are executors/tasks to launch and do a revive to be safe.

executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
metricsSource.recordRevive
logInfo("Reviving offers due to a finished executor task.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message looks a little bit misleading. Do we address the situation with a failed node specifically? From the message body, it looks like we start reviving offers because one of the tasks completed successfully.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, we start reviving offers when one of the tasks is completed successfully as well when it fails. It's how it was implemented, I didn't change this behavior, just added a log message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only revive if we are going to use offers to launch a task/executor. For spark that sounds like when executor fails but not when it succeeds? If yes, let's fix that as part of this effort. Not sure if this is the right place to revive, is there another place in the code where intent to launch is recorded? Also, note that revive can get dropped, so this should be done in some kind of periodic manner as long as there are executors/tasks to be launched.

assert(backend.doRequestTotalExecutors(2).futureValue)
verify(driver, times(0)).reviveOffers()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great to have a test imitating executor failure, when Driver needs to first revive offers, and then suppress when the total number of cores/executors reached.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Added a check for that in both tests.

Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @rpalaznik. Overall LGTM, I'd suggest cross-checking with @vinodkone to make sure we didn't miss anything.

Also, did you run the full unit test suite to make sure this change doesn't break other tests?

offers.asScala.map(_.getId).foreach(d.declineOffer)
logInfo("Executor limit reached. numExecutors: " + numExecutors +
" executorLimit: " + executorLimit + " . Suppressing further offers.")
d.suppressOffers

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should still decline these offers. Suppress doesn't cause these offers to be declined automatically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offers are actually declined just before this. Line 375.

offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
logInfo("Max core number is reached. Suppressing further offers.")
schedulerDriver.suppressOffers()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suppress after the decline just for consistency.

But, more importantly, I don't think it makes sense to call suppressOffers once per offer when maxCores is reached? We should only suppress once. So, maybe do this check and suppress before the for loop in line:420 or just move it to line:374, to keep suppression logic in one place? Moving to line:374 probably makes sense because we need to check this condition on offer reception incase this suppress call gets dropped?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the suppress call to the end of this function, so that it happens once and after offer declines.

executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
metricsSource.recordRevive
logInfo("Reviving offers due to a finished executor task.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only revive if we are going to use offers to launch a task/executor. For spark that sounds like when executor fails but not when it succeeds? If yes, let's fix that as part of this effort. Not sure if this is the right place to revive, is there another place in the code where intent to launch is recorded? Also, note that revive can get dropped, so this should be done in some kind of periodic manner as long as there are executors/tasks to be launched.

logInfo("Capping the total amount of executors to " + requestedTotal)
val reviveNeeded = executorLimit < requestedTotal
executorLimitOption = Some(requestedTotal)
if (reviveNeeded && schedulerDriver != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above regarding revives potentially being dropped. So we need to hook into some kind of periodic thread to check if there are executors/tasks to launch and do a revive to be safe.

@rpalaznik
Copy link
Author

Also, note that revive can get dropped, so this should be done in some kind of periodic manner as long as there are executors/tasks to be launched.

There is a thread that executes reviveOffers method periodically, controlled by property spark.scheduler.revive.interval (default 1 second). Looks very suitable for this purpose, so I've put revive calls there.

@rpalaznik rpalaznik requested review from akirillov and vinodkone and removed request for VladimirKravtsov February 6, 2020 16:34
@vinodkone
Copy link

@rpalaznik Can you point me to the periodic thread that's calling revive? I couldn't see it in the PR above?

@rpalaznik
Copy link
Author

@rpalaznik Can you point me to the periodic thread that's calling revive? I couldn't see it in the PR above?

@vinodkone, my bad, I got confused and though that this code will call reviveOffers()

https://github.com/mesosphere/spark/blob/ef57f026e18bf326c24559bde7bc4874a4a18d2d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L110-L114

But it actually calls DriverEndpoint's makeOffers().

I've added a new periodic thread in MesosCoarseGrainedSchedulerBackend and a test to make sure it works as expected this time. Tested it manually too.


private val mesosReviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread")
private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1000ms")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s seems too aggressive. Since this is mainly to guard against the edge case of revive being dropped, this could be changed to something higher, maybe 10s. Also, I would suggest to use a different new config option than piggybacking on the spark.scheduler.revive.interval which seems to be for a different thing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a new config option spark.scheduler.revive.interval. Default - 10 seconds.

metricsSource.recordRevive
d.reviveOffers
reviveMesosOffers(Option(d))
logInfo("Reviving offers due to a finished executor task.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the log line before the reviveMesosOffers call. Also, as mentioned above, let's just revive if the executor failed and not if it's completed successfully.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

override def run(): Unit = {
stateLock.synchronized {
if (!offersSuppressed) {
schedulerDriver.reviveOffers

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log line here. Would help debugging in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

- separate configuration parameter for the revive interval "spark.mesos.scheduler.revive.interval"
- default interval 10s
- additional debug logging
- don't revive if a task finished successfully
@rpalaznik rpalaznik requested a review from vinodkone February 11, 2020 11:38
Copy link

@vinodkone vinodkone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Would be nice to have @akirillov take a final look.

metricsSource.recordRevive
d.reviveOffers
if (state != TaskState.FINISHED) {
logInfo("Reviving offers due to a finished executor task.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/finished/failed/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

- Minor wording change
Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rpalaznik! The logic looks good, left a few comments - please take a look.


private val mesosReviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread")
private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's define this property in deploy/mesos/config.scala instead of an ad-hoc approach. This will help a lot with upgrades in the future.

Copy link
Author

@rpalaznik rpalaznik Feb 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private val schedulerUuid: String = UUID.randomUUID().toString
private val nextExecutorNumber = new AtomicLong()

private val mesosReviveThread =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit here and everywhere in this class: style-wise I don't think we should use mesos in variables and function names in this class at all because it assumes that we're dealing with Mesos.

Suggestion: mesosReviveThread -> reviveOffersExecutorService

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I renamed this instance and those from the comments bellow too


private val mesosReviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread")
private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s")
private val reviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s")

}
}

private def reviveMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def reviveMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {

}
}

private def suppressMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def suppressMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {
private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = {

- Moved property to config.scala
- Renamed a few variables and methods
@rpalaznik
Copy link
Author

left a few comments
@akirillov, I've addressed your suggestions. Could you take a look again? Thanks!

@rpalaznik rpalaznik requested a review from akirillov February 16, 2020 16:56
- Fix for broken unit tests
if (isNodeDraining(status)) {
new MesosClusterRetryState(status, retries, new Date(), waitTimeSec)
new MesosClusterRetryState(status, retries, new Date(),
if (waitTimeSec > 1) waitTimeSec / 2 else 1) // Keep waitTime the same

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, it looks like we roll back the change we applied on line 785 here so that we don't change the value on line 792. I can't see what are the benefits of this change? Also, reverting previously calculated exponential backoff doesn't seem to be a sound design decision.

Copy link
Author

@rpalaznik rpalaznik Feb 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the revert division part and rewrote it to be more clear.

The bug was that in case of an empty retryState (i.e. it's a first try), the initial waitTime should be kept as 1 second. An already existing test expected that and had a 1.5 sec delay. The node draining logic that was added, accidentally doubled the initial waitTime to 2 seconds causing that test to fail.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the explanation!

val nextRetry = new Date(new Date().getTime + newWaitTime * 1000L)
new MesosClusterRetryState(status, retries, nextRetry, newWaitTime)
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is a correct approach to solve the problem, this function still looks pretty complicated and contains two calls to isNodeDraining which is suboptimal. WDYT about refactoring it in this or similar way?

private def getNewRetryState(
  retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {
  retryState.map { rs =>
    // if a node is draining, the driver should be relaunched without backoff
    if (isNodeDraining(status)) {
      new MesosClusterRetryState(status, rs.retries + 1, new Date(), rs.waitTime)
    } else {
      val nextRetry = new Date(new Date().getTime + rs.waitTime * 1000L)
      new MesosClusterRetryState(status, rs.retries + 1, nextRetry, rs.waitTime * 2)
    }
  }.getOrElse {
    // this is the first retry which should happen without backoff
    new MesosClusterRetryState(status, 1, new Date(), 1)
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a minor concern that this again changes original behavior a bit by making first retry delay zero instead of 1 second for not node draining cases.

To avoid the extra call, I'd rather simply save the value, like this val noBackoff= isNodeDraining(status) and use it. What do you say?

Copy link

@akirillov akirillov Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main point here is to get rid of this construct at all due to the newly discovered issue:

val (retries, waitTimeSec) = retryState
      .map { rs => (rs.retries + 1, rs.waitTime) }
      .map { rs => (rs.retries + 1, if (isNodeDraining(status)) rs.waitTime else rs.waitTime * 2) }
      .getOrElse{ (1, 1) }

I believe that JIT compiler will inline a call to this function so getting rid of the double call is not a matter of performance optimization but more of readability. I'm not insisting on the code piece posted above but IMO its readability is better because it can express the whole function as a nested conditional statement:

//pseudo-code
if (this is not the first retry) {
  if (draining) {
    return draining-specific retry
  } else {
    return retry state with applied backoff
  }
} else {
  return first retry state
}

As for zero delay, we can refactor the function even further and use DateUtils class from Apache Commons (which comes as Spark dependency) to manipulate the time increments:

val now = new Date()
retryState.map { rs =>
  val newRetries = rs.retries + 1
  // if a node is draining, the driver should be relaunched without backoff
  if (isNodeDraining(status)) {
    new MesosClusterRetryState(status, newRetries, now, rs.waitTime)
  } else {
    new MesosClusterRetryState(status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)
  }
}.getOrElse {
  // this is the first retry which should happen without backoff
  new MesosClusterRetryState(status, 1, DateUtils.addSeconds(now, 1), 1)
}

Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rpalaznik. LGTM 👍 Let's get the CI green before merging and please check the comment regarding scalastyle

if (isNodeDraining(status)) {
new MesosClusterRetryState(status, newRetries, now, rs.waitTime)
} else {
new MesosClusterRetryState(status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one last thing, this line will cause scalastyle check to fail when we upstream this change (line length > 100), this should pass:

new MesosClusterRetryState(
          status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)

@rpalaznik rpalaznik merged commit 53da40a into custom-branch-2.4.3 Feb 24, 2020
@rpalaznik rpalaznik deleted the dcos-57560-suppress-mesos-offers branch February 24, 2020 09:15
rpalaznik added a commit that referenced this pull request Feb 24, 2020
DCOS-57560: Suppress/revive support for Mesos

- Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled)
- Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled)
- Suppresses Mesos offers when max core number is utilized.
- Doesn't revive if a task finished successfully
- Periodically revives Mesos offers if needed  using "spark.mesos.scheduler.revive.interval"
farhan5900 pushed a commit that referenced this pull request Aug 7, 2020
DCOS-57560: Suppress/revive support for Mesos

- Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled)
- Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled)
- Suppresses Mesos offers when max core number is utilized.
- Doesn't revive if a task finished successfully
- Periodically revives Mesos offers if needed  using "spark.mesos.scheduler.revive.interval"
kaiwalyajoshi pushed a commit that referenced this pull request Oct 23, 2020
* [DCOS-54813] Base tech update from 2.4.0 to 2.4.3 (#62)

* [DCOS-52207][Spark] Make Mesos Agent Blacklisting behavior configurable and more tolerant of failures. (#63)

* [DCOS-58386] Node draining support for supervised drivers; Mesos Java bump to 1.9.0 (#65)

* [DCOS-58389] Role propagation and enforcement support for Mesos Dispatcher (#66)

* DCOS-57560: Suppress/revive support for Mesos (#67)

* D2IQ-64778: Unregister progress listeners before stopping executor threads.

* [SPARK-32675][MESOS] --py-files option is appended without passing value for it

### What changes were proposed in this pull request?
The PR checks for the emptiness of `--py-files` value and uses it only if it is not empty.

### Why are the changes needed?
There is a bug in Mesos cluster mode REST Submission API. It is using `--py-files` option without specifying any value for the conf `spark.submit.pyFiles` by the user.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
* Submitting an application to a Mesos cluster:
`curl -X POST http://localhost:7077/v1/submissions/create --header "Content-Type:application/json" --data '{
"action": "CreateSubmissionRequest",
"appResource": "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar",
"clientSparkVersion": "3.0.0",
"appArgs": ["30"],
"environmentVariables": {},
"mainClass": "org.apache.spark.examples.SparkPi",
"sparkProperties": {
  "spark.jars": "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar",
  "spark.driver.supervise": "false",
  "spark.executor.memory": "512m",
  "spark.driver.memory": "512m",
  "spark.submit.deployMode": "cluster",
  "spark.app.name": "SparkPi",
  "spark.master": "mesos://localhost:5050"
}}'`
* It should be able to pick the correct class and run the job successfully.

Closes apache#29499 from farhan5900/SPARK-32675.

Authored-by: farhan5900 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 8749f2e)

* Applied all the remaining custom commits

* Removes duplicate org.scala-lang.modules dependency

* Removes duplicate gpus method

* Adds second parameter of deprecated annotation

* Fix typemismatch

* Removes extra shellEscape function calls

* Adds env variable for file based auth

* Remove generic shell escape and put it in specific places

* fix ambiguous import

* fix shell escaping for `--conf` options

* fix option type in test

* Fixes scalastyle and unit test issues

Co-authored-by: Alexander Lembiewski <[email protected]>
Co-authored-by: Farhan <[email protected]>
Co-authored-by: Anton Kirillov <[email protected]>
Co-authored-by: Anton Kirillov <[email protected]>
Co-authored-by: Roman Palaznik <[email protected]>
Co-authored-by: Roman Palaznik <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants