@@ -1045,31 +1045,37 @@ class DAGScheduler(
10451045        stage.pendingTasks +=  task
10461046
10471047      case  FetchFailed (bmAddress, shuffleId, mapId, reduceId) => 
1048-         //  Mark the stage that the reducer was in as unrunnable
10491048        val  failedStage  =  stageIdToStage(task.stageId)
1050-         markStageAsFinished(failedStage, Some (" Fetch failure"  ))
1051-         runningStages -=  failedStage
1052-         //  TODO: Cancel running tasks in the stage
1053-         logInfo(" Marking "   +  failedStage +  "  ("   +  failedStage.name + 
1054-           " ) for resubmision due to a fetch failure"  )
1055-         //  Mark the map whose fetch failed as broken in the map stage
1056-         val  mapStage  =  shuffleToMapStage(shuffleId)
1057-         if  (mapId !=  - 1 ) {
1058-           mapStage.removeOutputLoc(mapId, bmAddress)
1059-           mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1060-         }
1061-         logInfo(" The failed fetch was from "   +  mapStage +  "  ("   +  mapStage.name + 
1062-           " ); marking it for resubmission"  )
1063-         if  (failedStages.isEmpty &&  eventProcessActor !=  null ) {
1064-           //  Don't schedule an event to resubmit failed stages if failed isn't empty, because
1065-           //  in that case the event will already have been scheduled. eventProcessActor may be
1066-           //  null during unit tests.
1067-           import  env .actorSystem .dispatcher 
1068-           env.actorSystem.scheduler.scheduleOnce(
1069-             RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1049+         //  It is likely that we receive multiple FetchFailed for a single stage (because we have
1050+         //  multiple tasks running concurrently on different executors). In that case, it is possible
1051+         //  the fetch failure has already been handled by the executor.
1052+         if  (runningStages.contains(failedStage)) {
1053+           markStageAsFinished(failedStage, Some (" Fetch failure"  ))
1054+           runningStages -=  failedStage
1055+           //  TODO: Cancel running tasks in the stage
1056+           logInfo(" Marking "   +  failedStage +  "  ("   +  failedStage.name + 
1057+             " ) for resubmision due to a fetch failure"  )
1058+ 
1059+           //  Mark the map whose fetch failed as broken in the map stage
1060+           val  mapStage  =  shuffleToMapStage(shuffleId)
1061+           if  (mapId !=  - 1 ) {
1062+             mapStage.removeOutputLoc(mapId, bmAddress)
1063+             mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1064+           }
1065+ 
1066+           logInfo(" The failed fetch was from "   +  mapStage +  "  ("   +  mapStage.name + 
1067+             " ); marking it for resubmission"  )
1068+           if  (failedStages.isEmpty &&  eventProcessActor !=  null ) {
1069+             //  Don't schedule an event to resubmit failed stages if failed isn't empty, because
1070+             //  in that case the event will already have been scheduled. eventProcessActor may be
1071+             //  null during unit tests.
1072+             import  env .actorSystem .dispatcher 
1073+             env.actorSystem.scheduler.scheduleOnce(
1074+               RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1075+           }
1076+           failedStages +=  failedStage
1077+           failedStages +=  mapStage
10701078        }
1071-         failedStages +=  failedStage
1072-         failedStages +=  mapStage
10731079        //  TODO: mark the executor as failed only if there were lots of fetch failures on it
10741080        if  (bmAddress !=  null ) {
10751081          handleExecutorLost(bmAddress.executorId, Some (task.epoch))
0 commit comments