diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6ade180ee25..74440c1620b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1101,10 +1101,12 @@ private[spark] class DAGScheduler( // event. stage match { case s: ShuffleMapStage => - outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + outputCommitCoordinator.stageStart( + stage = s.id, stage.latestInfo.attemptNumber(), maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( - stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) + stage = s.id, stage.latestInfo.attemptNumber(), + maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index b382d623806e..a6538df3972f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -58,6 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private case class StageState(numPartitions: Int) { val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null) val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]() + var latestStageAttempt: Int = -1 } /** @@ -114,13 +115,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * yet been initialized. * * @param stage the stage id. + * @param stageAttemptNumber the stage attempt number. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { + private[scheduler] def stageStart( + stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized { stageStates.get(stage) match { case Some(state) => require(state.authorizedCommitters.length == maxPartitionId + 1) + state.latestStageAttempt = stageAttemptNumber logInfo(s"Reusing state from previous attempt of stage $stage.") case _ => @@ -177,9 +181,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) partition: Int, attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { - case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => + case Some(state) if attemptOutdatedOrFailed(state, stageAttempt, partition, attemptNumber) => logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + - s"task attempt $attemptNumber already marked as failed.") + s"task attempt $attemptNumber already outdated or marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) @@ -200,13 +204,14 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } - private def attemptFailed( + private def attemptOutdatedOrFailed( stageState: StageState, stageAttempt: Int, partition: Int, attempt: Int): Boolean = synchronized { val failInfo = TaskIdentifier(stageAttempt, attempt) - stageState.failures.get(partition).exists(_.contains(failInfo)) + stageAttempt < stageState.latestStageAttempt || + stageState.failures.get(partition).exists(_.contains(failInfo)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index a560013dba96..a8641b8ab226 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -176,7 +176,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val partition: Int = 2 val authorizedCommitter: Int = 3 val nonAuthorizedCommitter: Int = 100 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 2) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 2) assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter)) assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, @@ -203,7 +203,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val stageAttempt: Int = 1 val partition: Int = 1 val failedAttempt: Int = 0 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, attemptNumber = failedAttempt, reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) @@ -213,16 +213,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { test("SPARK-24589: Differentiate tasks from different stage attempts") { var stage = 1 + val stageAttempt: Int = 1 val taskAttempt = 1 val partition = 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) // Fail the task in the first attempt, the task in the second attempt should succeed. stage += 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) @@ -231,7 +232,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, // then fail the 1st attempt and make sure the 4th one can commit again. stage += 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) @@ -270,9 +271,26 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(retriedStage.size === 1) assert(sc.dagScheduler.outputCommitCoordinator.isEmpty) verify(sc.env.outputCommitCoordinator, times(2)) - .stageStart(meq(retriedStage.head), any()) + .stageStart(meq(retriedStage.head), any(), any()) verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head)) } + + test("SPARK-26634: Do not allow attempts of failed stage to commit") { + val stage: Int = 1 + var stageAttempt: Int = 1 + val partition: Int = 1 + val taskAttempt: Int = 0 + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) + stageAttempt += 1 + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) + // attempts of failed stage is not authorized for committing + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt - 1, partition, taskAttempt)) + outputCommitCoordinator.taskCompleted(stage, stageAttempt - 1, partition, + attemptNumber = taskAttempt, + reason = Success) + // attempts of latest retry stage is authorized for committing + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, taskAttempt)) + } } /**