Skip to content

Conversation

@umehrot2
Copy link

@umehrot2 umehrot2 commented Mar 28, 2017

What changes were proposed in this pull request?

The Spark’s DAGScheduler currently does not recompute all the lost shuffle blocks on a host when a FetchFailed exception occurs, while fetching shuffle blocks from another executor with external shuffle service enabled. Instead it only recomputes the lost shuffle blocks computed by the executor for which the FetchFailed exception occurred. This works fine for Internal shuffle scenario, where the executors serve their own shuffle blocks and hence only the shuffle blocks for that executor should be considered lost. However, when External Shuffle Service is being used, a FetchFailed exception would mean that the external shuffle service running on that host has become unavailable. This in turn is sufficient to assume that all the shuffle blocks which were managed by the Shuffle service on that host are lost. Therefore, just recomputing the shuffle blocks associated with the particular Executor for which FetchFailed exception occurred is not sufficient. We need to recompute all the shuffle blocks, managed by that service because there could be multiple executors running on that host.

Since not all the shuffle blocks (for all the executors on the host) are recomputed, this causes future attempts of the reduce stage to fail as well because the new tasks scheduled still keep trying to reach the old location of the shuffle blocks (which were not recomputed) and keep throwing further FetchFailed exceptions. This ultimately causes the job to fail, after the reduce stage has been retried 4 times.

Following changes are proposed to address the above issue:

  1. In case of FetchFailed exception when using external shuffle service, mark all the shuffle outputs on the host as failed (due to failure of external shuffle service).
  2. Thus recompute all the lost shuffle blocks, instead of for just one executor.

How was this patch tested?

  1. Added unit test for the change in functionality.
  2. Tested on a cluster with Spark running on Yarn (with external shuffle enabled), by performing the following steps:
  • Start a word count job, and wait for the Map stage to be completed
  • During the reduce stage, stop the external shuffle service on a host
  • Wait for fetch failed exception to occur, while fetching shuffle blocks from the host
  • Check that in the reattempt of the Map stage, Spark computes all the lost shuffle blocks for the host on which shuffle service was stopped
  • Job completes successfully, since reduce stage in next reattempt finds all the shuffle blocks
    @kayousterhout @mridulm @rxin

@kayousterhout
Copy link
Contributor

Jenkins this is OK to test

@kayousterhout
Copy link
Contributor

Have you seen #17088? I just glanced at this quickly but I think this is a duplicate of that (SPARK-19753)

@umehrot2
Copy link
Author

@kayousterhout Thanks for your response, and for that link. Well it does seem like #17088 addresses the same issue as this PR.

However, I would like the you all to review this PR as well, because I think it more clearly organizes the code between handling of internal and external shuffle failures. It also removes a lot of the code duplication which is part of the other PR. Further, it adds an epoch check for the 'host'.

@umehrot2
Copy link
Author

umehrot2 commented Apr 4, 2017

Jenkins test this please.

@kayousterhout
Copy link
Contributor

Jenkins this is ok to test

@SparkQA
Copy link

SparkQA commented Apr 4, 2017

Test build #75513 has finished for PR 17445 at commit 6642de3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@umehrot2
Copy link
Author

@kayousterhout @mridulm @rxin @lins05 Can you take a look at this PR ?

@umehrot2
Copy link
Author

@kayousterhout @mridulm @rxin @lins05 @markhamstra @tgravescs @squito Can you take a look at this ?

@tgravescs
Copy link
Contributor

there is a large discussion about how to handle fetch failures going on in https://issues.apache.org/jira/browse/SPARK-20178. The fact that you got a fetch failure does not mean that all blocks are invalid or that the external shuffle service is totally down. It could very well be an intermittent thing as well. There was also a pr to make the stage attempts configurable so you could increase that.

If a lot of people are seeing this issue the question is do we need to do something shorter term to handle this well we are discussing SPARK-20178. Certainly if we are seeing more actual job failures due to it, it would be better to invalidate all the output and it possibly runs longer but at least it doesn't fail.

@jiangxb1987
Copy link
Contributor

Now that #18150 has been merged, maybe we can close this now?

@srowen srowen mentioned this pull request Jun 25, 2017
@asfgit asfgit closed this in b32bd00 Jun 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants