@@ -1049,27 +1049,25 @@ class DAGScheduler(
10491049 val mapStage = shuffleToMapStage(shuffleId)
10501050 // It is likely that we receive multiple FetchFailed for a single stage (because we have
10511051 // multiple tasks running concurrently on different executors). In that case, it is possible
1052- // the fetch failure has already been handled by the executor .
1052+ // the fetch failure has already been handled by the scheduler .
10531053 if (runningStages.contains(failedStage)) {
10541054 markStageAsFinished(failedStage, Some (" Fetch failure" ))
10551055 runningStages -= failedStage
10561056 // TODO: Cancel running tasks in the stage
1057- logInfo(" Marking " + failedStage + " (" + failedStage.name +
1058- " ) for resubmision due to a fetch failure" )
1059-
1060- logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1061- " ); marking it for resubmission" )
1062- if (failedStages.isEmpty && eventProcessActor != null ) {
1063- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1064- // in that case the event will already have been scheduled. eventProcessActor may be
1065- // null during unit tests.
1066- import env .actorSystem .dispatcher
1067- env.actorSystem.scheduler.scheduleOnce(
1068- RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1069- }
1070- failedStages += failedStage
1071- failedStages += mapStage
1057+ logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1058+ s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
1059+ }
1060+
1061+ if (failedStages.isEmpty && eventProcessActor != null ) {
1062+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1063+ // in that case the event will already have been scheduled. eventProcessActor may be
1064+ // null during unit tests.
1065+ import env .actorSystem .dispatcher
1066+ env.actorSystem.scheduler.scheduleOnce(
1067+ RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
10721068 }
1069+ failedStages += failedStage
1070+ failedStages += mapStage
10731071
10741072 // Mark the map whose fetch failed as broken in the map stage
10751073 if (mapId != - 1 ) {
0 commit comments