Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 60e0868

Browse files
authored
[DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
1 parent 86a0075 commit 60e0868

File tree

4 files changed

+95
-4
lines changed

4 files changed

+95
-4
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,8 +523,8 @@ private[spark] class SecurityManager(
523523
}
524524

525525
/**
526-
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
527-
*/
526+
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
527+
*/
528528
def getFileBasedSecret(): Option[String] = {
529529
sparkConf
530530
.getOption(SPARK_AUTH_SECRET_CONF)

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ private[spark] class MesosClusterScheduler(
346346
this.masterInfo = Some(masterInfo)
347347
this.schedulerDriver = driver
348348

349-
if (!pendingRecover.isEmpty) {
349+
if (pendingRecover.nonEmpty) {
350350
// Start task reconciliation if we need to recover.
351351
val statuses = pendingRecover.collect {
352352
case (taskId, slaveId) =>
@@ -766,6 +766,10 @@ private[spark] class MesosClusterScheduler(
766766
val state = launchedDrivers(subId)
767767
// Check if the driver is supervise enabled and can be relaunched.
768768
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
769+
if (taskIsOutdated(taskId, state)) {
770+
// Prevent outdated task from overwriting a more recent status
771+
return
772+
}
769773
removeFromLaunchedDrivers(subId)
770774
state.finishDate = Some(new Date())
771775
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
@@ -788,6 +792,15 @@ private[spark] class MesosClusterScheduler(
788792
}
789793
}
790794

795+
/**
796+
* Check if the task has already been launched or is pending
797+
* If neither, the taskId is outdated and should be ignored
798+
* This is to avoid scenarios where an outdated status update arrives
799+
* after a supervised driver has already been relaunched
800+
*/
801+
private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
802+
taskId != state.taskId.getValue && !pendingRetryDrivers.contains(state.driverDescription)
803+
791804
private def retireDriver(
792805
submissionId: String,
793806
state: MesosClusterSubmissionState) = {

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
21-
import java.util.{Collections, UUID, List => JList}
21+
import java.util.{Collections, List => JList, UUID}
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323
import java.util.concurrent.locks.ReentrantLock
2424

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
421421
assert(state.finishedDrivers.size == 1)
422422
}
423423

424+
test("does not restart outdated supervised drivers") {
425+
// Covers scenario where:
426+
// - agent goes down
427+
// - supervised job is relaunched on another agent
428+
// - first agent re-registers and sends status update: TASK_FAILED
429+
// - job should NOT be relaunched again
430+
val conf = new SparkConf()
431+
conf.setMaster("mesos://localhost:5050")
432+
conf.setAppName("SparkMesosDriverRetries")
433+
setScheduler(conf.getAll.toMap)
434+
435+
val mem = 1000
436+
val cpu = 1
437+
val offers = List(
438+
Utils.createOffer("o1", "s1", mem, cpu, None),
439+
Utils.createOffer("o2", "s2", mem, cpu, None),
440+
Utils.createOffer("o3", "s1", mem, cpu, None))
441+
442+
val response = scheduler.submitDriver(
443+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
444+
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
445+
assert(response.success)
446+
447+
// Offer a resource to launch the submitted driver
448+
scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
449+
var state = scheduler.getSchedulerState()
450+
assert(state.launchedDrivers.size == 1)
451+
452+
// Signal agent lost with status with TASK_LOST
453+
val agent1 = SlaveID.newBuilder().setValue("s1").build()
454+
var taskStatus = TaskStatus.newBuilder()
455+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
456+
.setSlaveId(agent1)
457+
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
458+
.setState(MesosTaskState.TASK_LOST)
459+
.build()
460+
461+
scheduler.statusUpdate(driver, taskStatus)
462+
state = scheduler.getSchedulerState()
463+
assert(state.pendingRetryDrivers.size == 1)
464+
assert(state.launchedDrivers.isEmpty)
465+
466+
// Offer new resource to retry driver on a new agent
467+
Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
468+
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
469+
470+
val agent2 = SlaveID.newBuilder().setValue("s2").build()
471+
taskStatus = TaskStatus.newBuilder()
472+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
473+
.setSlaveId(agent2)
474+
.setState(MesosTaskState.TASK_RUNNING)
475+
.build()
476+
477+
scheduler.statusUpdate(driver, taskStatus)
478+
state = scheduler.getSchedulerState()
479+
assert(state.pendingRetryDrivers.isEmpty)
480+
assert(state.launchedDrivers.size == 1)
481+
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
482+
483+
// Agent1 comes back online and sends status update: TASK_FAILED
484+
taskStatus = TaskStatus.newBuilder()
485+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
486+
.setSlaveId(agent1)
487+
.setState(MesosTaskState.TASK_FAILED)
488+
.setMessage("Abnormal executor termination")
489+
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
490+
.build()
491+
492+
scheduler.statusUpdate(driver, taskStatus)
493+
scheduler.resourceOffers(driver, Collections.singletonList(offers.last))
494+
495+
// Assert driver does not restart 2nd time
496+
state = scheduler.getSchedulerState()
497+
assert(state.pendingRetryDrivers.isEmpty)
498+
assert(state.launchedDrivers.size == 1)
499+
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
500+
}
501+
424502
test("Declines offer with refuse seconds = 120.") {
425503
setScheduler()
426504

0 commit comments

Comments
 (0)