Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 1b4f06f

Browse files
samvantranAnton Kirillov
authored andcommitted
[DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
1 parent a9d7bba commit 1b4f06f

File tree

4 files changed

+95
-4
lines changed

4 files changed

+95
-4
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ private[spark] class SecurityManager(
340340
}
341341

342342
/**
343-
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
344-
*/
343+
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
344+
*/
345345
def getFileBasedSecret(): Option[String] = {
346346
sparkConf
347347
.getOption(SPARK_AUTH_SECRET_CONF)

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private[spark] class MesosClusterScheduler(
345345
this.masterInfo = Some(masterInfo)
346346
this.schedulerDriver = driver
347347

348-
if (!pendingRecover.isEmpty) {
348+
if (pendingRecover.nonEmpty) {
349349
// Start task reconciliation if we need to recover.
350350
val statuses = pendingRecover.collect {
351351
case (taskId, slaveId) =>
@@ -788,6 +788,10 @@ private[spark] class MesosClusterScheduler(
788788
val state = launchedDrivers(subId)
789789
// Check if the driver is supervise enabled and can be relaunched.
790790
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
791+
if (taskIsOutdated(taskId, state)) {
792+
// Prevent outdated task from overwriting a more recent status
793+
return
794+
}
791795
removeFromLaunchedDrivers(subId)
792796
state.finishDate = Some(new Date())
793797
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
@@ -810,6 +814,15 @@ private[spark] class MesosClusterScheduler(
810814
}
811815
}
812816

817+
/**
818+
* Check if the task has already been launched or is pending
819+
* If neither, the taskId is outdated and should be ignored
820+
* This is to avoid scenarios where an outdated status update arrives
821+
* after a supervised driver has already been relaunched
822+
*/
823+
private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
824+
taskId != state.taskId.getValue && !pendingRetryDrivers.contains(state.driverDescription)
825+
813826
private def retireDriver(
814827
submissionId: String,
815828
state: MesosClusterSubmissionState) = {

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler.cluster.mesos
1919

2020
import java.io.File
21-
import java.util.{Collections, UUID, List => JList}
21+
import java.util.{Collections, List => JList, UUID}
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323
import java.util.concurrent.locks.ReentrantLock
2424

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
421421
assert(state.finishedDrivers.size == 1)
422422
}
423423

424+
test("does not restart outdated supervised drivers") {
425+
// Covers scenario where:
426+
// - agent goes down
427+
// - supervised job is relaunched on another agent
428+
// - first agent re-registers and sends status update: TASK_FAILED
429+
// - job should NOT be relaunched again
430+
val conf = new SparkConf()
431+
conf.setMaster("mesos://localhost:5050")
432+
conf.setAppName("SparkMesosDriverRetries")
433+
setScheduler(conf.getAll.toMap)
434+
435+
val mem = 1000
436+
val cpu = 1
437+
val offers = List(
438+
Utils.createOffer("o1", "s1", mem, cpu, None),
439+
Utils.createOffer("o2", "s2", mem, cpu, None),
440+
Utils.createOffer("o3", "s1", mem, cpu, None))
441+
442+
val response = scheduler.submitDriver(
443+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
444+
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
445+
assert(response.success)
446+
447+
// Offer a resource to launch the submitted driver
448+
scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
449+
var state = scheduler.getSchedulerState()
450+
assert(state.launchedDrivers.size == 1)
451+
452+
// Signal agent lost with status with TASK_LOST
453+
val agent1 = SlaveID.newBuilder().setValue("s1").build()
454+
var taskStatus = TaskStatus.newBuilder()
455+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
456+
.setSlaveId(agent1)
457+
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
458+
.setState(MesosTaskState.TASK_LOST)
459+
.build()
460+
461+
scheduler.statusUpdate(driver, taskStatus)
462+
state = scheduler.getSchedulerState()
463+
assert(state.pendingRetryDrivers.size == 1)
464+
assert(state.launchedDrivers.isEmpty)
465+
466+
// Offer new resource to retry driver on a new agent
467+
Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
468+
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
469+
470+
val agent2 = SlaveID.newBuilder().setValue("s2").build()
471+
taskStatus = TaskStatus.newBuilder()
472+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
473+
.setSlaveId(agent2)
474+
.setState(MesosTaskState.TASK_RUNNING)
475+
.build()
476+
477+
scheduler.statusUpdate(driver, taskStatus)
478+
state = scheduler.getSchedulerState()
479+
assert(state.pendingRetryDrivers.isEmpty)
480+
assert(state.launchedDrivers.size == 1)
481+
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
482+
483+
// Agent1 comes back online and sends status update: TASK_FAILED
484+
taskStatus = TaskStatus.newBuilder()
485+
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
486+
.setSlaveId(agent1)
487+
.setState(MesosTaskState.TASK_FAILED)
488+
.setMessage("Abnormal executor termination")
489+
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
490+
.build()
491+
492+
scheduler.statusUpdate(driver, taskStatus)
493+
scheduler.resourceOffers(driver, Collections.singletonList(offers.last))
494+
495+
// Assert driver does not restart 2nd time
496+
state = scheduler.getSchedulerState()
497+
assert(state.pendingRetryDrivers.isEmpty)
498+
assert(state.launchedDrivers.size == 1)
499+
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
500+
}
501+
424502
test("Declines offer with refuse seconds = 120.") {
425503
setScheduler()
426504

0 commit comments

Comments
 (0)