[SPARK-14293] Improve shuffle load balancing and minimize network data transmission. #12085
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Based on the map output sizes and locations tracked by MapOutputTracker, we can obtain a better load balancing
This patch proposes a strategy to set preferred locations for each reduce task, which could firstly keep each executor process almost the same amount of intermediate data and secondly minimize the network data transmission. This can benefit some conditions:
1.REDUCER_PREF_LOCS_FRACTION tries to place the reduce tasks close to the largest output. If there exists data skew in the map outputs. It could cause some executors that have large of map outputs become busy. Our method could avoid this case and minimize the network data transmission.
2.When there are large of reduce tasks in the job, it helps each executor processes almost the same data and keeps load balancing.
The special steps are following.
Step 1: For each task, calculate the amount of data and their distributions.
Step 2: Divide the tasks into n groups according to the number of nodes and data size, ensuring that the data size for each group is nearly equal.
Step 3: Determine the amount of local data if the tasks of every group are executed on every node. Thus, a n × n matrix is created.
Step 4: Choose the largest value in the matrix to identify which group is allocated to which node. Mark the row and column at which the selected group is located to ensure that the group is not chosen next time. Goto Step 4 until no group is available.
How was this patch tested?
Unit test suite
Author: Cheng Pei [email protected]