-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host #25299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...n/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlocksRemoved.java
Show resolved
Hide resolved
|
Test build #108396 has finished for PR 25299 at commit
|
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
Show resolved
Hide resolved
|
Test build #108403 has finished for PR 25299 at commit
|
|
Test build #108406 has finished for PR 25299 at commit
|
|
Test build #108407 has finished for PR 25299 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need a more careful read.
It really sucks how many changes are needed just to add a new metric... at some point I hope someone rewrites that code. :-/
In the meantime I see the benefit of the new metric, but some more tests are needed, especially because of the disk caching that the SHS does (see comments). In the worst case, there's a hammer we can use (increase the version of the disk store, which will cause the SHS to discard old cached data).
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
| val readRecords: IndexedSeq[Double], | ||
| val remoteBlocksFetched: IndexedSeq[Double], | ||
| val localBlocksFetched: IndexedSeq[Double], | ||
| val hostLocalBlocksFetched: IndexedSeq[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm.
I wonder what happens if:
- you run the current version of the SHS with the disk enabled
- look at an app's stage in the UI, which will cache this information
- shut down the SHS, and bring up the version with this change
- the data in the disk store doesn't have this field, so what will happen?
My feeling is that this field will be null, which may cause some problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will test this. But isn't we had this problem before this PR. As the api already contained changes compared to Spark v2.4.3.
And I guess for the above test I should use a SHS from version 2.4 (for the first step), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible, but we should still check and fix it if the problem exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding this PR the disk caching of SHS problem is not relevant as the metric changes are reverted but I still would like to check whether the problem already exists between 2.4 and 3.0 and fix if there is any issue.
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
Outdated
Show resolved
Hide resolved
|
does this work with external shuffle service? |
|
Yes. Basically external shuffle service basically does the same: I mean it reads the local disks of the host local executors directly (it has its own mapping from app ID and executor ID tuples to local disk). With this feature a regular executor will be able to do the same (here the mapping from executor to local disk is at the block manager master). |
|
Do you mean people can already avoid the unnecessary network if they turn on external shuffle service? |
|
No. To fetch a block from the external shuffle service the network must be used. |
|
Test build #108468 has finished for PR 25299 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mentioned this to attila offline -- I actually do not see much value in adding the extra metric. it doesn't seem particularly actionable to the end user. I think the host-local blocks should just get counted as local blocks. But, I don't feel so strongly about it, so if others think the metric is useful ...
does this really need to be configurable? wouldn't you want this always on? There is no issue w/ having a legacy shuffle service here, right? (comment inline about removing the only minor performance penalty I see)
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
@tgravescs I think we talked about this a long time ago, and you thought spark already had this optimization in place, so you might be interested in this change |
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
|
yes we had talked about this and I looked briefly, but like mentioned in the description you have to have a way for each to know about different executor locations. @attilapiros I'm curious did you run any performance numbers to see if it makes a difference? |
|
@tgravescs unfortunately haven't run any as for this case I have no good performance tests. |
I'm not totally sold on it either - I can see some value, but maybe not enough to justify all the hassle around actually adding it... |
|
I can revert the metric changes and can count the host-local related bytes and number of blocks into the local ones. So should I go ahead and do the revert? |
|
Sounds good to me. |
|
Test build #108748 has finished for PR 25299 at commit
|
7dded2c to
50bca42
Compare
|
Test build #108769 has finished for PR 25299 at commit
|
|
Test build #108896 has finished for PR 25299 at commit
|
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
Outdated
Show resolved
Hide resolved
|
just FYI: I remember there was the similar work for this improvement a few years ago: SPARK-6521(#9478). We closed the ticket cuz we didn't get actual performance gains. Anyway, I added a link to it in the jira. |
|
@maropu I think the performance gains would be significant if the block size is over "spark.maxRemoteBlockSizeFetchToMem" (which default is 2GB), then without this change the shuffle block during fetching:
With this change it would be just read from disk directly. |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
jenkins retest this please |
|
jenkins retest this please |
|
Test build #114310 has finished for PR 25299 at commit
|
|
I only skimmed this but overall looks good, like this approach. When fetching the host local blocks do we have any limits on # that can happen concurrently? I didn't see anything config on just skimming it. If we have a lot of other executors on the same node we end up bypassing the remote fetch configs as to # in parallel so now we are doing more work at once here which could potentially cause more local disk activity on both the read and write sides. |
|
@tgravescs The block content reading will occur at the client side of the |
|
ok that makes sense. thanks. So it seems we fetch all the local blocks in first and then the host local blocks and all that happens in the current thread while the remote block fetcher happens in separate. |
"Host local" blocks are basically local blocks after you fill in the local dirs cache. And "fetch" is a misnomer in this case; you'll get a pointer to the data (which mostly involves metadata operations like finding the file, but not actually opening it), and it will only be actually opened when the task starts running. So not really any waiting involved. Sounds like a gain to me - while before you'd have RPC + open on the remote side + transfer data over socket + cache it in memory in executor (or, worst, write it to disk again + open another file), now you only do an "open file". |
|
Definitely agree, sounds like gain and been wanting this for a while so thanks @attilapiros for working on it. And you're right I wasn't thinking about this not even opening the file so should be very fast. |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
|
Test build #114429 has finished for PR 25299 at commit
|
…cached dirs not one for each
f8e2967 to
da93837
Compare
|
jenkins retest this please |
|
test this please |
|
The error is: Meanwhile this link is working: janino 3.0.15. Could it be the local maven repo in the Jenkins which runs the PR builder should be purged? |
|
Test build #114464 has finished for PR 25299 at commit
|
|
Test build #114468 has finished for PR 25299 at commit
|
|
Test build #114470 has finished for PR 25299 at commit
|
|
Merging to master. |
… from the same host ## What changes were proposed in this pull request? Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester. Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager`). Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly. ## How was this patch tested? ### Unit tests `ExternalShuffleServiceSuite`: - "SPARK-27651: host local disk reading avoids external shuffle service on the same node" `ShuffleBlockFetcherIteratorSuite`: - "successful 3 local reads + 4 host local reads + 2 remote reads" And with extending existing suites where shuffle metrics was tested. ### Manual tests Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks. ``` $ grep host-local experiment.log 19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks 19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1 19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms 19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks 19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0 19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms ``` Closes apache#25299 from attilapiros/SPARK-27651. Authored-by: “attilapiros” <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
| .createWithDefault(false) | ||
|
|
||
| private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = | ||
| ConfigBuilder("spark.shuffle.readHostLocalDisk.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@attilapiros @cloud-fan @squito @tgravescs @vanzin,
I think we need to mention in migration guide that this option must be disabled for old shuffle service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about disabling this feature when spark.shuffle.useOldFetchProtocol is true (and updating the error message along with the corresponding docs; see below)?
As spark.shuffle.useOldFetchProtocol is already exists and documented in one of the migration guide (although I do not know why it is in sql-migration-guide.md).
https://github.com/apache/spark/blob/master/docs/sql-migration-guide.md:
Since Spark 3.0, we use a new protocol for fetching shuffle blocks, for external shuffle service users, we need to upgrade the server correspondingly. Otherwise, we'll get the error message UnsupportedOperationException: Unexpected message: FetchShuffleBlocks. If it is hard to upgrade the shuffle service right now, you can still use the old protocol by setting spark.shuffle.useOldFetchProtocol to true.
I have created a new jira for this: https://issues.apache.org/jira/browse/SPARK-30235
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new PR is #26869.
…rnal shuffle service is disabled ### What changes were proposed in this pull request? This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled. Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors. To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple. ### Why are the changes needed? After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled. ### Does this PR introduce _any_ user-facing change? Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement. ### How was this patch tested? Added test and tested manually. Closes #28911 from Ngone51/support_node_local_shuffle. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…anager fetches shuffle blocks from the same host
[SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host
## What changes were proposed in this pull request?
Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-
local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.
Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering
this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManage
rMasterEndpoint#removeBlockManager`).
Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the
files behind the blocks directly.
## How was this patch tested?
### Unit tests
`ExternalShuffleServiceSuite`:
- "SPARK-27651: host local disk reading avoids external shuffle service on the same node"
`ShuffleBlockFetcherIteratorSuite`:
- "successful 3 local reads + 4 host local reads + 2 remote reads"
And with extending existing suites where shuffle metrics was tested.
### Manual tests
Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.
```
$ grep host-local experiment.log
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms
```
Closes apache#25299 from attilapiros/SPARK-27651.
Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
RB=2244102
BUG=LIHADOOP-55020
G=spark-reviewers
R=chsingh,mshen,vsowrira
A=chsingh
…rnal shuffle service is disabled This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled. Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors. To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple. After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled. Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement. Added test and tested manually. Closes #28911 from Ngone51/support_node_local_shuffle. Authored-by: yi.wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Before this PR
ShuffleBlockFetcherIteratorwas partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.Moreover to get the local directories of the other executors/block managers a new RPC message is introduced
GetLocalDirswhich is sent the the block manager master where it is answered asBlockManagerLocalDirs. InBlockManagerMasterEndpointfor answering this request thelocalDirsis extracted from theBlockManagerInfoand stored separately in a hash map calledexecutorIdLocalDirs. Because the earlier usedblockManagerInfocontains data for the alive block managers (seeorg.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager).Now
executorIdLocalDirsknows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly.How was this patch tested?
Unit tests
ExternalShuffleServiceSuite:ShuffleBlockFetcherIteratorSuite:And with extending existing suites where shuffle metrics was tested.
Manual tests
Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.