Skip to content

Commit 743a1e6

Browse files
committed
refine case
1 parent c544356 commit 743a1e6

File tree

6 files changed

+143
-18
lines changed

6 files changed

+143
-18
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,9 +1157,9 @@ class DAGScheduler(
11571157
val stage = stageIdToStage(task.stageId)
11581158
event.reason match {
11591159
case Success =>
1160-
stage.pendingPartitions -= task.partitionId
11611160
task match {
11621161
case rt: ResultTask[_, _] =>
1162+
stage.pendingPartitions -= task.partitionId
11631163
// Cast to ResultStage here because it's part of the ResultTask
11641164
// TODO Refactor this out to a function that accepts a ResultStage
11651165
val resultStage = stage.asInstanceOf[ResultStage]
@@ -1200,6 +1200,7 @@ class DAGScheduler(
12001200
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
12011201
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
12021202
} else {
1203+
stage.pendingPartitions -= task.partitionId
12031204
shuffleStage.addOutputLoc(smt.partitionId, status)
12041205
}
12051206

@@ -1339,19 +1340,51 @@ class DAGScheduler(
13391340
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
13401341
blockManagerMaster.removeExecutor(execId)
13411342

1343+
val resubmitStages: HashSet[Int] = HashSet.empty
13421344
if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
13431345
// TODO: This will be really slow if we keep accumulating shuffle map stages
13441346
for ((shuffleId, stage) <- shuffleToMapStage) {
13451347
stage.removeOutputsOnExecutor(execId)
1346-
mapOutputTracker.registerMapOutputs(
1347-
shuffleId,
1348-
stage.outputLocInMapOutputTrackerFormat(),
1349-
changeEpoch = true)
1348+
val locs = stage.outputLocInMapOutputTrackerFormat()
1349+
if (runningStages.contains(stage)) {
1350+
// Assumption: 1) not a FetchFailed ExecutorLost, 2) a running shuffleMapStage has
1351+
// multiple taskSets: 1 active, some Zombie, some removed as finished. Executor lost
1352+
// may lost the output only finish by the removedTasksets or zombieTasksets, So need
1353+
// to check if runningStage.pendingPartitions == Missing shuffleMapStage.outputLocs
1354+
// if is false, says lost locs in removedTaskSets or zombieTaskSets,
1355+
// So need mark active as zombie and resubmit that stage
1356+
if (!fetchFailed && stage.findMissingPartitions()
1357+
.exists(!stage.pendingPartitions.contains(_))) {
1358+
resubmitStages += stage.id
1359+
}
1360+
mapOutputTracker.incrementEpoch()
1361+
} else {
1362+
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
1363+
}
13501364
}
1365+
13511366
if (shuffleToMapStage.isEmpty) {
13521367
mapOutputTracker.incrementEpoch()
13531368
}
1369+
13541370
clearCacheLocs()
1371+
1372+
if (!fetchFailed) {
1373+
// if FailedStages is not empty,
1374+
// it implies that had already scheduled a ResubmitFailedStages.
1375+
if (failedStages.isEmpty) {
1376+
messageScheduler.schedule(new Runnable {
1377+
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1378+
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1379+
}
1380+
resubmitStages.foreach {
1381+
case stageId =>
1382+
val stage = stageIdToStage(stageId)
1383+
logWarning(s"Executor $execId cause $stageId partition lost, So resubmit")
1384+
markStageAsFinished(stage, Some(s"Executor $execId lost"))
1385+
failedStages += stage
1386+
}
1387+
}
13551388
}
13561389
} else {
13571390
logDebug("Additional executor lost message for " + execId +
@@ -1416,6 +1449,7 @@ class DAGScheduler(
14161449

14171450
outputCommitCoordinator.stageEnd(stage.id)
14181451
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
1452+
taskScheduler.zombieTasks(stage.id)
14191453
runningStages -= stage
14201454
}
14211455

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ private[spark] trait TaskScheduler {
5353
// Cancel a stage.
5454
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
5555

56+
def zombieTasks(stageId: Int): Unit
57+
5658
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
5759
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
5860

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,17 @@ private[spark] class TaskSchedulerImpl(
224224
}
225225
}
226226

227+
override def zombieTasks(stageId: Int): Unit = synchronized {
228+
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
229+
attempts.foreach { case (stageAttemptId, tsm) =>
230+
if (!tsm.isZombie) {
231+
logInfo(s"Mark stage($stageId) taskset ${tsm.taskSet.id} as Zombie")
232+
tsm.isZombie = true
233+
}
234+
}
235+
}
236+
}
237+
227238
/**
228239
* Called to indicate that all task attempts (including speculated tasks) associated with the
229240
* given TaskSetManager have completed, so state associated with the TaskSetManager should be

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,12 @@ private[spark] class TaskSetManager(
787787
addPendingTask(index)
788788
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
789789
// stage finishes when a total of tasks.size tasks finish.
790-
sched.dagScheduler.taskEnded(
791-
tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
790+
// The reason for not resubmitting ZombieTasks is make DAGScheduler to
791+
// know whether the lost partition can re-run on current activeTaskSet or not.
792+
if (!isZombie) {
793+
sched.dagScheduler.taskEnded(
794+
tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
795+
}
792796
}
793797
}
794798
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
122122
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
123123
cancelledStages += stageId
124124
}
125+
override def zombieTasks(stageId: Int): Unit = {}
125126
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
126127
override def defaultParallelism() = 2
127128
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
@@ -480,6 +481,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
480481
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
481482
throw new UnsupportedOperationException
482483
}
484+
override def zombieTasks(stageId: Int): Unit = {}
483485
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
484486
override def defaultParallelism(): Int = 2
485487
override def executorHeartbeatReceived(
@@ -1272,13 +1274,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
12721274
Success,
12731275
makeMapStatus("hostA", reduceRdd.partitions.length)))
12741276

1275-
// now that host goes down
1276-
runEvent(ExecutorLost("exec-hostA"))
1277-
12781277
// so we resubmit those tasks
1278+
// note these resubmit events arrived earlier than ExecutorLost
12791279
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
12801280
runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null))
12811281

1282+
// now that host goes down
1283+
runEvent(ExecutorLost("exec-hostA"))
1284+
12821285
// now complete everything on a different host
12831286
complete(taskSets(0), Seq(
12841287
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
@@ -1304,6 +1307,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
13041307
assert(stage1TaskSet.stageAttemptId == 0)
13051308
}
13061309

1310+
test("Resubmit stage while lost partition in ZombieTasksets or RemovedTaskSets") {
1311+
val firstRDD = new MyRDD(sc, 3, Nil)
1312+
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
1313+
val firstShuffleId = firstShuffleDep.shuffleId
1314+
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
1315+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
1316+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
1317+
submit(reduceRdd, Array(0))
1318+
1319+
// things start out smoothly, stage 0 completes with no issues
1320+
complete(taskSets(0), Seq(
1321+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
1322+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
1323+
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
1324+
))
1325+
1326+
// then start running stage 1
1327+
runEvent(makeCompletionEvent(
1328+
taskSets(1).tasks(0),
1329+
Success,
1330+
makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
1331+
1332+
// simulate make stage 1 resubmit, notice for stage1.0
1333+
// partitionId=1 already finished in hostD, so if we resubmit stage1,
1334+
// stage 1.1 only resubmit tasks for partitionId = 0,2
1335+
runEvent(makeCompletionEvent(
1336+
taskSets(1).tasks(1),
1337+
FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
1338+
scheduler.resubmitFailedStages()
1339+
1340+
val stage1Resubmit1 = taskSets(2)
1341+
assert(stage1Resubmit1.stageId == 1)
1342+
assert(stage1Resubmit1.tasks.size == 2)
1343+
1344+
// now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
1345+
// runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
1346+
runEvent(ExecutorLost("exec-hostD"))
1347+
scheduler.resubmitFailedStages()
1348+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1349+
1350+
assert(taskSets(3).tasks.size == 3) // both stage 1 partition 0/1/2
1351+
1352+
// let stage1Resubmit1 complete
1353+
complete(taskSets(2), Seq(
1354+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
1355+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
1356+
))
1357+
1358+
// and let we complete stage1Resubmit0's active running Tasks
1359+
runEvent(makeCompletionEvent(
1360+
taskSets(1).tasks(1),
1361+
Success,
1362+
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
1363+
runEvent(makeCompletionEvent(
1364+
taskSets(1).tasks(2),
1365+
Success,
1366+
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
1367+
1368+
runEvent(makeCompletionEvent(
1369+
taskSets(3).tasks(0),
1370+
Success,
1371+
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
1372+
1373+
assert(scheduler.runningStages.head.isInstanceOf[ResultStage])
1374+
}
1375+
13071376
/**
13081377
* Makes sure that failures of stage used by multiple jobs are correctly handled.
13091378
*
@@ -1467,16 +1536,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
14671536
// blockManagerMaster.removeExecutor("exec-hostA")
14681537
// pretend we were told hostA went away
14691538
runEvent(ExecutorLost("exec-hostA"))
1539+
14701540
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
14711541
// rather than marking it is as failed and waiting.
14721542
complete(taskSets(0), Seq(
14731543
(Success, makeMapStatus("hostA", 1)),
14741544
(Success, makeMapStatus("hostB", 1))))
1545+
1546+
// In previous due to pendingPartitions -= expiredTask.partitonID,
1547+
// so will cause Stage resubmit, now we ignored expiredTask partition.
14751548
// have hostC complete the resubmitted task
1476-
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
1549+
complete(taskSets(0), Seq((Success, makeMapStatus("hostC", 1))))
14771550
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
14781551
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
1479-
complete(taskSets(2), Seq((Success, 42)))
1552+
complete(taskSets(1), Seq((Success, 42)))
14801553
assert(results === Map(0 -> 42))
14811554
assertDataStructuresEmpty()
14821555
}
@@ -1927,8 +2000,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
19272000
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
19282001
assert(results.size === 0) // Map stage job should not be complete yet
19292002

2003+
19302004
// Pretend host A was lost
19312005
val oldEpoch = mapOutputTracker.getEpoch
2006+
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
19322007
runEvent(ExecutorLost("exec-hostA"))
19332008
val newEpoch = mapOutputTracker.getEpoch
19342009
assert(newEpoch > oldEpoch)
@@ -1941,20 +2016,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
19412016
runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
19422017
assert(results.size === 0) // Map stage job should not be complete yet
19432018

1944-
// Now complete tasks in the second task set
1945-
val newTaskSet = taskSets(1)
1946-
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
1947-
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
2019+
assert(scheduler.runningStages.head.pendingPartitions.size === 2) // Both tasks 0 and 1
2020+
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
19482021
assert(results.size === 0) // Map stage job should not be complete yet
1949-
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
2022+
runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
19502023
assert(results.size === 1) // Map stage job should now finally be complete
19512024
assertDataStructuresEmpty()
19522025

19532026
// Also test that a reduce stage using this shuffled data can immediately run
19542027
val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
19552028
results.clear()
19562029
submit(reduceRDD, Array(0, 1))
1957-
complete(taskSets(2), Seq((Success, 42), (Success, 43)))
2030+
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
19582031
assert(results === Map(0 -> 42, 1 -> 43))
19592032
results.clear()
19602033
assertDataStructuresEmpty()

core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private class DummyTaskScheduler extends TaskScheduler {
6161
override def start(): Unit = {}
6262
override def stop(): Unit = {}
6363
override def submitTasks(taskSet: TaskSet): Unit = {}
64+
override def zombieTasks(stage: Int): Unit = {}
6465
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
6566
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
6667
override def defaultParallelism(): Int = 2

0 commit comments

Comments
 (0)