Skip to content

Commit ab8d13e

Browse files
author
jinxing
committed
fix
1 parent 46ef5a3 commit ab8d13e

File tree

1 file changed

+27
-9
lines changed

1 file changed

+27
-9
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2162,7 +2162,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
21622162
}
21632163

21642164
test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
2165-
" even with late completions from earlier stage attempts") {
2165+
" even with late completions from earlier stage attempts") {
2166+
// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
21662167
val rddA = new MyRDD(sc, 2, Nil)
21672168
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
21682169
val shuffleIdA = shuffleDepA.shuffleId
@@ -2174,39 +2175,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
21742175

21752176
submit(rddC, Array(0, 1))
21762177

2178+
// Complete both tasks in rddA.
21772179
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
21782180
complete(taskSets(0), Seq(
21792181
(Success, makeMapStatus("hostA", 2)),
21802182
(Success, makeMapStatus("hostA", 2))))
21812183

2182-
// Fetch failed on hostA.
2184+
// Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
2185+
// and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
2186+
assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
21832187
runEvent(makeCompletionEvent(
21842188
taskSets(1).tasks(0),
21852189
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
21862190
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
2187-
null))
2191+
result = null))
21882192

2193+
// Both original tasks in rddA should be marked as failed, because they ran on the
2194+
// failed hostA, so both should be resubmitted. Complete them on hostB successfully.
21892195
scheduler.resubmitFailedStages()
2190-
2191-
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
2196+
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
2197+
&& taskSets(2).tasks.size === 2)
21922198
complete(taskSets(2), Seq(
21932199
(Success, makeMapStatus("hostB", 2)),
21942200
(Success, makeMapStatus("hostB", 2))))
21952201

2196-
// Task succeeds on a failed executor. The success is bogus.
2202+
// Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
2203+
// successfully. The success should be ignored because the task started before the
2204+
// executor failed, so the output may have been lost.
21972205
runEvent(makeCompletionEvent(
21982206
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
21992207

2200-
assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
2208+
// Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
2209+
// Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
2210+
// Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
2211+
// is still running.
2212+
assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
2213+
&& taskSets(3).tasks.size === 2)
22012214
runEvent(makeCompletionEvent(
22022215
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
22032216

2204-
// There should be no new attempt of stage submitted.
2217+
// There should be no new attempt of stage submitted,
2218+
// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
2219+
// the current attempt (and hasn't completed successfully in any earlier attempts).
22052220
assert(taskSets.size === 4)
2221+
2222+
// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
22062223
runEvent(makeCompletionEvent(
22072224
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
22082225

2209-
// ResultStage submitted.
2226+
// Now the ResultStage should be submitted, because all of the tasks of rddB have
2227+
// completed successfully on alive executors.
22102228
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
22112229
complete(taskSets(4), Seq(
22122230
(Success, 1),

0 commit comments

Comments
 (0)