-
Notifications
You must be signed in to change notification settings - Fork 7
DCOS-57560: Suppress/revive support for Mesos #67
DCOS-57560: Suppress/revive support for Mesos #67
Conversation
- Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled) - Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled) - Suppresses Mesos offers when max core number is utilized.
akirillov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @rpalaznik. Overall LGTM, please check my comments.
| logInfo("Capping the total amount of executors to " + requestedTotal) | ||
| val reviveNeeded = executorLimit < requestedTotal | ||
| executorLimitOption = Some(requestedTotal) | ||
| if (reviveNeeded && schedulerDriver != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is safe to rely on doRequestTotalExecutors invocations to revive offers. In addition to this check, I'd suggest running another check in a background thread and revive offers if the conditions apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all cases are covered actually. There are two conditions when offers are suppressed:
-
numExecutors >= executorLimit-
numExecutorsdecreases only whenexecutorTerminated()is called, which happens onlystatusUpdate()right before reviving offers -
executorLimitchanges only indoRequestTotalExecutors()
-
-
totalCoresAcquired >= maxCores-
Similarly to
numExecturos,totalCoresAcquireddecreases only instatusUpdate()when a task has finished, which is always followed by a revive. -
maxCoresis a pre-configured value and doesn't change
-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above regarding revives potentially being dropped. So we need to hook into some kind of periodic thread to check if there are executors/tasks to launch and do a revive to be safe.
| executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") | ||
| // In case we'd rejected everything before but have now lost a node | ||
| metricsSource.recordRevive | ||
| logInfo("Reviving offers due to a finished executor task.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log message looks a little bit misleading. Do we address the situation with a failed node specifically? From the message body, it looks like we start reviving offers because one of the tasks completed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, we start reviving offers when one of the tasks is completed successfully as well when it fails. It's how it was implemented, I didn't change this behavior, just added a log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only revive if we are going to use offers to launch a task/executor. For spark that sounds like when executor fails but not when it succeeds? If yes, let's fix that as part of this effort. Not sure if this is the right place to revive, is there another place in the code where intent to launch is recorded? Also, note that revive can get dropped, so this should be done in some kind of periodic manner as long as there are executors/tasks to be launched.
| assert(backend.doRequestTotalExecutors(2).futureValue) | ||
| verify(driver, times(0)).reviveOffers() | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great to have a test imitating executor failure, when Driver needs to first revive offers, and then suppress when the total number of cores/executors reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Added a check for that in both tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @rpalaznik. Overall LGTM, I'd suggest cross-checking with @vinodkone to make sure we didn't miss anything.
Also, did you run the full unit test suite to make sure this change doesn't break other tests?
| offers.asScala.map(_.getId).foreach(d.declineOffer) | ||
| logInfo("Executor limit reached. numExecutors: " + numExecutors + | ||
| " executorLimit: " + executorLimit + " . Suppressing further offers.") | ||
| d.suppressOffers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should still decline these offers. Suppress doesn't cause these offers to be declined automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offers are actually declined just before this. Line 375.
| offerTasks.asJava) | ||
| } else if (totalCoresAcquired >= maxCores) { | ||
| logInfo("Max core number is reached. Suppressing further offers.") | ||
| schedulerDriver.suppressOffers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suppress after the decline just for consistency.
But, more importantly, I don't think it makes sense to call suppressOffers once per offer when maxCores is reached? We should only suppress once. So, maybe do this check and suppress before the for loop in line:420 or just move it to line:374, to keep suppression logic in one place? Moving to line:374 probably makes sense because we need to check this condition on offer reception incase this suppress call gets dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved the suppress call to the end of this function, so that it happens once and after offer declines.
| executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") | ||
| // In case we'd rejected everything before but have now lost a node | ||
| metricsSource.recordRevive | ||
| logInfo("Reviving offers due to a finished executor task.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only revive if we are going to use offers to launch a task/executor. For spark that sounds like when executor fails but not when it succeeds? If yes, let's fix that as part of this effort. Not sure if this is the right place to revive, is there another place in the code where intent to launch is recorded? Also, note that revive can get dropped, so this should be done in some kind of periodic manner as long as there are executors/tasks to be launched.
| logInfo("Capping the total amount of executors to " + requestedTotal) | ||
| val reviveNeeded = executorLimit < requestedTotal | ||
| executorLimitOption = Some(requestedTotal) | ||
| if (reviveNeeded && schedulerDriver != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above regarding revives potentially being dropped. So we need to hook into some kind of periodic thread to check if there are executors/tasks to launch and do a revive to be safe.
…cheduler.revive.interval"
There is a thread that executes |
|
@rpalaznik Can you point me to the periodic thread that's calling revive? I couldn't see it in the PR above? |
@vinodkone, my bad, I got confused and though that this code will call But it actually calls DriverEndpoint's I've added a new periodic thread in |
|
|
||
| private val mesosReviveThread = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread") | ||
| private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1000ms") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1s seems too aggressive. Since this is mainly to guard against the edge case of revive being dropped, this could be changed to something higher, maybe 10s. Also, I would suggest to use a different new config option than piggybacking on the spark.scheduler.revive.interval which seems to be for a different thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a new config option spark.scheduler.revive.interval. Default - 10 seconds.
| metricsSource.recordRevive | ||
| d.reviveOffers | ||
| reviveMesosOffers(Option(d)) | ||
| logInfo("Reviving offers due to a finished executor task.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the log line before the reviveMesosOffers call. Also, as mentioned above, let's just revive if the executor failed and not if it's completed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| override def run(): Unit = { | ||
| stateLock.synchronized { | ||
| if (!offersSuppressed) { | ||
| schedulerDriver.reviveOffers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a log line here. Would help debugging in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
- separate configuration parameter for the revive interval "spark.mesos.scheduler.revive.interval" - default interval 10s - additional debug logging - don't revive if a task finished successfully
vinodkone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Would be nice to have @akirillov take a final look.
| metricsSource.recordRevive | ||
| d.reviveOffers | ||
| if (state != TaskState.FINISHED) { | ||
| logInfo("Reviving offers due to a finished executor task.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/finished/failed/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
- Minor wording change
akirillov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @rpalaznik! The logic looks good, left a few comments - please take a look.
|
|
||
| private val mesosReviveThread = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread") | ||
| private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's define this property in deploy/mesos/config.scala instead of an ad-hoc approach. This will help a lot with upgrades in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| private val schedulerUuid: String = UUID.randomUUID().toString | ||
| private val nextExecutorNumber = new AtomicLong() | ||
|
|
||
| private val mesosReviveThread = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit here and everywhere in this class: style-wise I don't think we should use mesos in variables and function names in this class at all because it assumes that we're dealing with Mesos.
Suggestion: mesosReviveThread -> reviveOffersExecutorService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I renamed this instance and those from the comments bellow too
|
|
||
| private val mesosReviveThread = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-revive-thread") | ||
| private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private val mesosReviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s") | |
| private val reviveIntervalMs = conf.getTimeAsMs("spark.mesos.scheduler.revive.interval", "10s") |
| } | ||
| } | ||
|
|
||
| private def reviveMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private def reviveMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { | |
| private def reviveOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { |
| } | ||
| } | ||
|
|
||
| private def suppressMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private def suppressMesosOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { | |
| private def suppressOffers(driver: Option[org.apache.mesos.SchedulerDriver]): Unit = { |
- Moved property to config.scala - Renamed a few variables and methods
|
- Fix for broken unit tests
| if (isNodeDraining(status)) { | ||
| new MesosClusterRetryState(status, retries, new Date(), waitTimeSec) | ||
| new MesosClusterRetryState(status, retries, new Date(), | ||
| if (waitTimeSec > 1) waitTimeSec / 2 else 1) // Keep waitTime the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially, it looks like we roll back the change we applied on line 785 here so that we don't change the value on line 792. I can't see what are the benefits of this change? Also, reverting previously calculated exponential backoff doesn't seem to be a sound design decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dropped the revert division part and rewrote it to be more clear.
The bug was that in case of an empty retryState (i.e. it's a first try), the initial waitTime should be kept as 1 second. An already existing test expected that and had a 1.5 sec delay. The node draining logic that was added, accidentally doubled the initial waitTime to 2 seconds causing that test to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks for the explanation!
| val nextRetry = new Date(new Date().getTime + newWaitTime * 1000L) | ||
| new MesosClusterRetryState(status, retries, nextRetry, newWaitTime) | ||
| val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) | ||
| new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is a correct approach to solve the problem, this function still looks pretty complicated and contains two calls to isNodeDraining which is suboptimal. WDYT about refactoring it in this or similar way?
private def getNewRetryState(
retryState: Option[MesosClusterRetryState], status: TaskStatus): MesosClusterRetryState = {
retryState.map { rs =>
// if a node is draining, the driver should be relaunched without backoff
if (isNodeDraining(status)) {
new MesosClusterRetryState(status, rs.retries + 1, new Date(), rs.waitTime)
} else {
val nextRetry = new Date(new Date().getTime + rs.waitTime * 1000L)
new MesosClusterRetryState(status, rs.retries + 1, nextRetry, rs.waitTime * 2)
}
}.getOrElse {
// this is the first retry which should happen without backoff
new MesosClusterRetryState(status, 1, new Date(), 1)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a minor concern that this again changes original behavior a bit by making first retry delay zero instead of 1 second for not node draining cases.
To avoid the extra call, I'd rather simply save the value, like this val noBackoff= isNodeDraining(status) and use it. What do you say?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main point here is to get rid of this construct at all due to the newly discovered issue:
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, rs.waitTime) }
.map { rs => (rs.retries + 1, if (isNodeDraining(status)) rs.waitTime else rs.waitTime * 2) }
.getOrElse{ (1, 1) }
I believe that JIT compiler will inline a call to this function so getting rid of the double call is not a matter of performance optimization but more of readability. I'm not insisting on the code piece posted above but IMO its readability is better because it can express the whole function as a nested conditional statement:
//pseudo-code
if (this is not the first retry) {
if (draining) {
return draining-specific retry
} else {
return retry state with applied backoff
}
} else {
return first retry state
}
As for zero delay, we can refactor the function even further and use DateUtils class from Apache Commons (which comes as Spark dependency) to manipulate the time increments:
val now = new Date()
retryState.map { rs =>
val newRetries = rs.retries + 1
// if a node is draining, the driver should be relaunched without backoff
if (isNodeDraining(status)) {
new MesosClusterRetryState(status, newRetries, now, rs.waitTime)
} else {
new MesosClusterRetryState(status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)
}
}.getOrElse {
// this is the first retry which should happen without backoff
new MesosClusterRetryState(status, 1, DateUtils.addSeconds(now, 1), 1)
}
akirillov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @rpalaznik. LGTM 👍 Let's get the CI green before merging and please check the comment regarding scalastyle
| if (isNodeDraining(status)) { | ||
| new MesosClusterRetryState(status, newRetries, now, rs.waitTime) | ||
| } else { | ||
| new MesosClusterRetryState(status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one last thing, this line will cause scalastyle check to fail when we upstream this change (line length > 100), this should pass:
new MesosClusterRetryState(
status, newRetries, DateUtils.addSeconds(now, rs.waitTime), rs.waitTime * 2)
DCOS-57560: Suppress/revive support for Mesos - Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled) - Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled) - Suppresses Mesos offers when max core number is utilized. - Doesn't revive if a task finished successfully - Periodically revives Mesos offers if needed using "spark.mesos.scheduler.revive.interval"
DCOS-57560: Suppress/revive support for Mesos - Suppresses Mesos offers when the executor cap is reached (dynamic allocation enabled) - Reviving Mesos offers when the executor cap is increased (dynamic allocation enabled) - Suppresses Mesos offers when max core number is utilized. - Doesn't revive if a task finished successfully - Periodically revives Mesos offers if needed using "spark.mesos.scheduler.revive.interval"
* [DCOS-54813] Base tech update from 2.4.0 to 2.4.3 (#62) * [DCOS-52207][Spark] Make Mesos Agent Blacklisting behavior configurable and more tolerant of failures. (#63) * [DCOS-58386] Node draining support for supervised drivers; Mesos Java bump to 1.9.0 (#65) * [DCOS-58389] Role propagation and enforcement support for Mesos Dispatcher (#66) * DCOS-57560: Suppress/revive support for Mesos (#67) * D2IQ-64778: Unregister progress listeners before stopping executor threads. * [SPARK-32675][MESOS] --py-files option is appended without passing value for it ### What changes were proposed in this pull request? The PR checks for the emptiness of `--py-files` value and uses it only if it is not empty. ### Why are the changes needed? There is a bug in Mesos cluster mode REST Submission API. It is using `--py-files` option without specifying any value for the conf `spark.submit.pyFiles` by the user. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * Submitting an application to a Mesos cluster: `curl -X POST http://localhost:7077/v1/submissions/create --header "Content-Type:application/json" --data '{ "action": "CreateSubmissionRequest", "appResource": "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar", "clientSparkVersion": "3.0.0", "appArgs": ["30"], "environmentVariables": {}, "mainClass": "org.apache.spark.examples.SparkPi", "sparkProperties": { "spark.jars": "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar", "spark.driver.supervise": "false", "spark.executor.memory": "512m", "spark.driver.memory": "512m", "spark.submit.deployMode": "cluster", "spark.app.name": "SparkPi", "spark.master": "mesos://localhost:5050" }}'` * It should be able to pick the correct class and run the job successfully. Closes apache#29499 from farhan5900/SPARK-32675. Authored-by: farhan5900 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 8749f2e) * Applied all the remaining custom commits * Removes duplicate org.scala-lang.modules dependency * Removes duplicate gpus method * Adds second parameter of deprecated annotation * Fix typemismatch * Removes extra shellEscape function calls * Adds env variable for file based auth * Remove generic shell escape and put it in specific places * fix ambiguous import * fix shell escaping for `--conf` options * fix option type in test * Fixes scalastyle and unit test issues Co-authored-by: Alexander Lembiewski <[email protected]> Co-authored-by: Farhan <[email protected]> Co-authored-by: Anton Kirillov <[email protected]> Co-authored-by: Anton Kirillov <[email protected]> Co-authored-by: Roman Palaznik <[email protected]> Co-authored-by: Roman Palaznik <[email protected]>
What changes were proposed in this pull request?
This PR adds support in Spark drivers for suppressing Mesos resource offers when max number of executors started. Currently the offers are declined with configurable delay (2 minutes by default).
How was this patch tested?
Integration tests will be added to mesosphere/spark-build later in a separate PR
Release notes
Added support for resource offer suppression when run in a Mesos cluster