Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1101,10 +1101,12 @@ private[spark] class DAGScheduler(
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
outputCommitCoordinator.stageStart(
stage = s.id, stage.latestInfo.attemptNumber(), maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
stage = s.id, stage.latestInfo.attemptNumber(),
maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private case class StageState(numPartitions: Int) {
val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null)
val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]()
var latestStageAttempt: Int = -1
}

/**
Expand Down Expand Up @@ -114,13 +115,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* yet been initialized.
*
* @param stage the stage id.
* @param stageAttemptNumber the stage attempt number.
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized {
private[scheduler] def stageStart(
stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one parameter per line, double indented

stageStates.get(stage) match {
case Some(state) =>
require(state.authorizedCommitters.length == maxPartitionId + 1)
state.latestStageAttempt = stageAttemptNumber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check if current latestStageAttempt is less than stageAttemptNumber or not?

logInfo(s"Reusing state from previous attempt of stage $stage.")

case _ =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to assign stageAttemptNumber to latestStageAttempt of newly create StageState too.

Expand Down Expand Up @@ -177,9 +181,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
partition: Int,
attemptNumber: Int): Boolean = synchronized {
stageStates.get(stage) match {
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
case Some(state) if attemptOutdatedOrFailed(state, stageAttempt, partition, attemptNumber) =>
logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
s"task attempt $attemptNumber already marked as failed.")
s"task attempt $attemptNumber already outdated or marked as failed.")
false
case Some(state) =>
val existing = state.authorizedCommitters(partition)
Expand All @@ -200,13 +204,14 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}

private def attemptFailed(
private def attemptOutdatedOrFailed(
stageState: StageState,
stageAttempt: Int,
partition: Int,
attempt: Int): Boolean = synchronized {
val failInfo = TaskIdentifier(stageAttempt, attempt)
stageState.failures.get(partition).exists(_.contains(failInfo))
stageAttempt < stageState.latestStageAttempt ||
stageState.failures.get(partition).exists(_.contains(failInfo))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 2)

assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
Expand All @@ -203,7 +203,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val stageAttempt: Int = 1
val partition: Int = 1
val failedAttempt: Int = 0
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
attemptNumber = failedAttempt,
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
Expand All @@ -213,16 +213,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

test("SPARK-24589: Differentiate tasks from different stage attempts") {
var stage = 1
val stageAttempt: Int = 1
val taskAttempt = 1
val partition = 1

outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))

// Fail the task in the first attempt, the task in the second attempt should succeed.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
Expand All @@ -231,7 +232,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
// then fail the 1st attempt and make sure the 4th one can commit again.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
Expand Down Expand Up @@ -270,9 +271,26 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(retriedStage.size === 1)
assert(sc.dagScheduler.outputCommitCoordinator.isEmpty)
verify(sc.env.outputCommitCoordinator, times(2))
.stageStart(meq(retriedStage.head), any())
.stageStart(meq(retriedStage.head), any(), any())
verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head))
}

test("SPARK-26634: Do not allow attempts of failed stage to commit") {
val stage: Int = 1
var stageAttempt: Int = 1
val partition: Int = 1
val taskAttempt: Int = 0
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
stageAttempt += 1
outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1)
// attempts of failed stage is not authorized for committing
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt - 1, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, stageAttempt - 1, partition,
attemptNumber = taskAttempt,
reason = Success)
// attempts of latest retry stage is authorized for committing
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, taskAttempt))
}
}

/**
Expand Down