@@ -1121,7 +1121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11211121 }
11221122 }
11231123
1124- test(" SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset" ) {
1124+ test(" Completions in zombie tasksets update status of non-zombie taskset" ) {
11251125 val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11261126 val valueSer = SparkEnv .get.serializer.newInstance()
11271127
@@ -1133,9 +1133,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133 }
11341134
11351135 // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1136- // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1137- // to really happen, you'd need the previous stage to also get restarted, and then succeed,
1138- // in between each attempt, but that happens outside what we're mocking here.)
1136+ // two times, so we have three active task sets for one stage. (For this to really happen,
1137+ // you'd need the previous stage to also get restarted, and then succeed, in between each
1138+ // attempt, but that happens outside what we're mocking here.)
11391139 val zombieAttempts = (0 until 2 ).map { stageAttempt =>
11401140 val attempt = FakeTask .createTaskSet(10 , stageAttemptId = stageAttempt)
11411141 taskScheduler.submitTasks(attempt)
@@ -1152,51 +1152,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11521152 assert(tsm.runningTasks === 9 )
11531153 tsm
11541154 }
1155- // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
1156- // attempt exists in taskScheduler by now.
1157-
1158- // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1159- // This is possible since the behaviour of submitting new attempt and handling successful task
1160- // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1161- // separately.
1162- (0 until 2 ).foreach { i =>
1163- completeTaskSuccessfully(zombieAttempts(i), i + 1 )
1164- assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(i + 1 ))
1165- }
11661155
1167- // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1168- // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1169- // already completed tasks. And this time with insufficient resources so not all tasks are
1170- // active.
1156+ // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1157+ // the stage, but this time with insufficient resources so not all tasks are active.
1158+
11711159 val finalAttempt = FakeTask .createTaskSet(10 , stageAttemptId = 2 )
11721160 taskScheduler.submitTasks(finalAttempt)
11731161 val finalTsm = taskScheduler.taskSetManagerForAttempt(0 , 2 ).get
1174- // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
1175- // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
1176- // any duplicate tasks later (SPARK-25250).
1177- (0 until 2 ).map(_ + 1 ).foreach { partitionId =>
1178- val index = finalTsm.partitionToIndex(partitionId)
1179- assert(finalTsm.successful(index))
1180- }
1181-
11821162 val offers = (0 until 5 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
11831163 val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11841164 finalAttempt.tasks(task.index).partitionId
11851165 }.toSet
11861166 assert(finalTsm.runningTasks === 5 )
11871167 assert(! finalTsm.isZombie)
11881168
1189- // We continually simulate late completions from our zombie tasksets(but this time, there's one
1190- // active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1191- // final attempt. This means we're only waiting on the tasks we've already launched.
1169+ // We simulate late completions from our zombie tasksets, corresponding to all the pending
1170+ // partitions in our final attempt. This means we're only waiting on the tasks we've already
1171+ // launched.
11921172 val finalAttemptPendingPartitions = (0 until 10 ).toSet.diff(finalAttemptLaunchedPartitions)
11931173 finalAttemptPendingPartitions.foreach { partition =>
11941174 completeTaskSuccessfully(zombieAttempts(0 ), partition)
1195- assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(partition))
11961175 }
11971176
11981177 // If there is another resource offer, we shouldn't run anything. Though our final attempt
1199- // used to have pending tasks, now those tasks have been completed by zombie attempts. The
1178+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
12001179 // remaining tasks to compute are already active in the non-zombie attempt.
12011180 assert(
12021181 taskScheduler.resourceOffers(IndexedSeq (WorkerOffer (" exec-1" , " host-1" , 1 ))).flatten.isEmpty)
@@ -1244,7 +1223,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12441223 // perspective, as the failures weren't from a problem w/ the tasks themselves.
12451224 verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0 ), meq(stageAttempt), any())
12461225 }
1247- assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
12481226 }
12491227
12501228 test(" don't schedule for a barrier taskSet if available slots are less than pending tasks" ) {
0 commit comments