-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26634]Do not allow task of FetchFailureStage commit in OutputCommitCoordinator #23563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
| private case class StageState(numPartitions: Int) { | ||
| val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null) | ||
| val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]() | ||
| var latestStageAttempt: Int = -1 | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -114,13 +115,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
| * yet been initialized. | ||
| * | ||
| * @param stage the stage id. | ||
| * @param stageAttemptNumber the stage attempt number. | ||
| * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. | ||
| * the maximum possible value of `context.partitionId`). | ||
| */ | ||
| private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { | ||
| private[scheduler] def stageStart( | ||
| stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized { | ||
| stageStates.get(stage) match { | ||
| case Some(state) => | ||
| require(state.authorizedCommitters.length == maxPartitionId + 1) | ||
| state.latestStageAttempt = stageAttemptNumber | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need to check if current |
||
| logInfo(s"Reusing state from previous attempt of stage $stage.") | ||
|
|
||
| case _ => | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is better to assign |
||
|
|
@@ -177,9 +181,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
| partition: Int, | ||
| attemptNumber: Int): Boolean = synchronized { | ||
| stageStates.get(stage) match { | ||
| case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => | ||
| case Some(state) if attemptOutdatedOrFailed(state, stageAttempt, partition, attemptNumber) => | ||
| logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + | ||
| s"task attempt $attemptNumber already marked as failed.") | ||
| s"task attempt $attemptNumber already outdated or marked as failed.") | ||
| false | ||
| case Some(state) => | ||
| val existing = state.authorizedCommitters(partition) | ||
|
|
@@ -200,13 +204,14 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
| } | ||
| } | ||
|
|
||
| private def attemptFailed( | ||
| private def attemptOutdatedOrFailed( | ||
| stageState: StageState, | ||
| stageAttempt: Int, | ||
| partition: Int, | ||
| attempt: Int): Boolean = synchronized { | ||
| val failInfo = TaskIdentifier(stageAttempt, attempt) | ||
| stageState.failures.get(partition).exists(_.contains(failInfo)) | ||
| stageAttempt < stageState.latestStageAttempt || | ||
| stageState.failures.get(partition).exists(_.contains(failInfo)) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one parameter per line, double indented