Skip to content

Conversation

@suyanNone
Copy link
Contributor

@suyanNone suyanNone commented Sep 28, 2015

We meet that problem in Spark 1.3.0, and I also reproduce on the latest version.

desc:

  1. We know a running ShuffleMapStage will have multiple TaskSet: one Active TaskSet, multiple Zombie TaskSet, and mutiple removedTaskSet

  2. We think a running ShuffleMapStage is success only if its partition are all process success, namely each task‘s MapStatus are all add into outputLocs

  3. MapStatus of running ShuffleMapStage may succeed by RemovedTaskSet1../Zombie TaskSet1 / Zombie TaskSet2 /..../ Active TaskSetN. So it had a chance that some output only hold by some RemovedTaskset or ZombieTaskSet.

  4. If lost a executor, it chanced that some lost-executor related MapStatus are succeed by some Zombie TaskSet.

    In current logical, The solution to resolved that lost MapStatus problem is,
    each TaskSet re-running that those tasks which succeed in lost-executor: re-add into TaskSet's pendingTasks,
    and re-add it paritions into Stage‘s pendingPartitions .
    but it is useless if that lost MapStatus only belong to Zombie/Removed TaskSet, it is Zombie, so will never be scheduled his pendingTasks

  5. The condition for resubmit stage is only if some task throws FetchFailedException, but may the lost-executor just not empty any MapStatus of parent Stage for one of running Stages,
    and it‘s happen to that running Stage was lost a MapStatus only belong to a ZombieTask or removedTaskset.
    So if all Zombie TaskSets are all processed his runningTasks and Active TaskSet are all processed his pendingTask, then will removed by TaskSchedulerImp, then that running Stage's pending partitions is still nonEmpty. it will hangs......

TestCase to show problem:

 test("Resubmit stage while lost partition in ZombieTasksets or RemovedTaskSets") {
    val firstRDD = new MyRDD(sc, 3, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
    val firstShuffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    submit(reduceRdd, Array(0))

    // things start out smoothly, stage 0 completes with no issues
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
    ))

    // then start running stage 1
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(0),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    // simulate make stage 1 resubmit, notice for stage1.0
    // partitionId=1 already finished in hostD, so if we resubmit stage1,
    // stage 1.1 only resubmit tasks for partitionId = 0,2
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(1),
      FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
    scheduler.resubmitFailedStages()

    val stage1Resubmit1 = taskSets(2)
    assert(stage1Resubmit1.stageId == 1)
    assert(stage1Resubmit1.tasks.size == 2)

    // now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
    runEvent(ExecutorLost("exec-hostD"))
    runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))

    // let stage1Resubmit1 complete
    complete(taskSets(2), Seq(
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
    ))

    // and let we complete tasksets1.0's active running Tasks
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(1),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    runEvent(makeCompletionEvent(
      taskSets(1).tasks(2),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    // Now all runningTasksets for stage1 was all completed. 
    assert(scheduler.runningStages.head.pendingPartitions.head == 0)

main changes:

  1. make DAGScheuler only receive Task Resubmit events from ActiveTaskSets, so it can compare pendingPartitions with ShuffleMapStage missing outputs to know whether there have some partition cannot compute according current Tasksets, and make a decision if there is a need to resubmit ShuffleMapStage

other changes:

  1. not register running stage outputlocs while executor lost
  2. Make stage's tasksets as zombie while marked as finished
  3. ignore expired task's partition output loc
  4. add make taskSetManager not handle failedTask again, which task already mark as failed due to executor lost

@suyanNone suyanNone closed this Sep 28, 2015
@suyanNone
Copy link
Contributor Author

I will run a test job on the latest code, to confirm that problem exist or not...

@suyanNone suyanNone reopened this Sep 28, 2015
@suyanNone
Copy link
Contributor Author

Reproduce that, so re-open that

@SparkQA
Copy link

SparkQA commented Sep 28, 2015

Test build #43057 has finished for PR 8927 at commit 301da0a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Sep 28, 2015

Test build #43059 has finished for PR 8927 at commit 301da0a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2015

Test build #43060 has finished for PR 8927 at commit ce83c9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Apr 20, 2016

Test build #56344 has finished for PR 8927 at commit 1be6071.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2016

Test build #56345 has finished for PR 8927 at commit fb478bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone suyanNone changed the title [SPARK-10796][CORE]Resubmit stage while lost task in Zombie TaskSets [SPARK-10796][CORE]Resubmit stage while lost task in Zombie and removed TaskSetsAttempt Apr 22, 2016

outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
taskScheduler.zombieTasks(stage.id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once stage was finished, it should make previous taskset Zombie

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56691 has finished for PR 8927 at commit 70af484.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Success,
makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 2)
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For running stage , executor lost will not register outputlocs in this PR

@suyanNone
Copy link
Contributor Author

@squito @srowen

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56692 has finished for PR 8927 at commit 259698e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2016

Test build #58235 has finished for PR 8927 at commit 743a1e6.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2016

Test build #58236 has finished for PR 8927 at commit 3bf1eaa.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@suyanNone, I think the conflicts should be resolved at the last once (as a mergeable state). Would you be able to resolve them?

@gatorsmile
Copy link
Member

We are closing it due to inactivity. please do reopen if you want to push it forward. Thanks!

@asfgit asfgit closed this in b32bd00 Jun 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants