Skip to content

Conversation

@attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Jul 30, 2019

What changes were proposed in this pull request?

Before this PR ShuffleBlockFetcherIterator was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.

Moreover to get the local directories of the other executors/block managers a new RPC message is introduced GetLocalDirs which is sent the the block manager master where it is answered as BlockManagerLocalDirs. In BlockManagerMasterEndpoint for answering this request the localDirs is extracted from the BlockManagerInfo and stored separately in a hash map called executorIdLocalDirs. Because the earlier used blockManagerInfo contains data for the alive block managers (see org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager).

Now executorIdLocalDirs knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly.

How was this patch tested?

Unit tests

ExternalShuffleServiceSuite:

  • "SPARK-27651: host local disk reading avoids external shuffle service on the same node"

ShuffleBlockFetcherIteratorSuite:

  • "successful 3 local reads + 4 host local reads + 2 remote reads"

And with extending existing suites where shuffle metrics was tested.

Manual tests

Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.

$ grep host-local experiment.log
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108396 has finished for PR 25299 at commit 7210742.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class GetLocalDirs(executorIds: Array[String]) extends ToBlockManagerMaster
  • case class BlockManagerLocalDirs(localDirs: Seq[Array[String]])

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108403 has finished for PR 25299 at commit d509278.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108406 has finished for PR 25299 at commit 894e772.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108407 has finished for PR 25299 at commit 17552ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a more careful read.

It really sucks how many changes are needed just to add a new metric... at some point I hope someone rewrites that code. :-/

In the meantime I see the benefit of the new metric, but some more tests are needed, especially because of the disk caching that the SHS does (see comments). In the worst case, there's a hammer we can use (increase the version of the disk store, which will cause the SHS to discard old cached data).

val readRecords: IndexedSeq[Double],
val remoteBlocksFetched: IndexedSeq[Double],
val localBlocksFetched: IndexedSeq[Double],
val hostLocalBlocksFetched: IndexedSeq[Double],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm.

I wonder what happens if:

  • you run the current version of the SHS with the disk enabled
  • look at an app's stage in the UI, which will cache this information
  • shut down the SHS, and bring up the version with this change
  • the data in the disk store doesn't have this field, so what will happen?

My feeling is that this field will be null, which may cause some problems.

Copy link
Contributor Author

@attilapiros attilapiros Jul 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test this. But isn't we had this problem before this PR. As the api already contained changes compared to Spark v2.4.3.

And I guess for the above test I should use a SHS from version 2.4 (for the first step), right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but we should still check and fix it if the problem exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this PR the disk caching of SHS problem is not relevant as the metric changes are reverted but I still would like to check whether the problem already exists between 2.4 and 3.0 and fix if there is any issue.

@cloud-fan
Copy link
Contributor

does this work with external shuffle service?

@attilapiros
Copy link
Contributor Author

Yes.

Basically external shuffle service basically does the same: I mean it reads the local disks of the host local executors directly (it has its own mapping from app ID and executor ID tuples to local disk). With this feature a regular executor will be able to do the same (here the mapping from executor to local disk is at the block manager master).

@cloud-fan
Copy link
Contributor

Do you mean people can already avoid the unnecessary network if they turn on external shuffle service?

@attilapiros
Copy link
Contributor Author

attilapiros commented Jul 31, 2019

No. To fetch a block from the external shuffle service the network must be used.
Here the executor can read the block for itself from the disk directly so the fetching via network (and writing the content to a file) is really avoided. What I meant the mechanism how it is done (reading the disk of the other executors) is the very same.

@SparkQA
Copy link

SparkQA commented Jul 31, 2019

Test build #108468 has finished for PR 25299 at commit 193a0eb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class BlockManagerLocalDirs(localDirs: Map[String, Array[String]])

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mentioned this to attila offline -- I actually do not see much value in adding the extra metric. it doesn't seem particularly actionable to the end user. I think the host-local blocks should just get counted as local blocks. But, I don't feel so strongly about it, so if others think the metric is useful ...

does this really need to be configurable? wouldn't you want this always on? There is no issue w/ having a legacy shuffle service here, right? (comment inline about removing the only minor performance penalty I see)

@squito
Copy link
Contributor

squito commented Aug 1, 2019

@tgravescs I think we talked about this a long time ago, and you thought spark already had this optimization in place, so you might be interested in this change

@tgravescs
Copy link
Contributor

yes we had talked about this and I looked briefly, but like mentioned in the description you have to have a way for each to know about different executor locations.

@attilapiros I'm curious did you run any performance numbers to see if it makes a difference?

@attilapiros
Copy link
Contributor Author

@tgravescs unfortunately haven't run any as for this case I have no good performance tests.
But I am positive it makes a difference especially for small clusters.

@vanzin
Copy link
Contributor

vanzin commented Aug 2, 2019

I actually do not see much value in adding the extra metric

I'm not totally sold on it either - I can see some value, but maybe not enough to justify all the hassle around actually adding it...

@attilapiros
Copy link
Contributor Author

I can revert the metric changes and can count the host-local related bytes and number of blocks into the local ones. So should I go ahead and do the revert?

@vanzin
Copy link
Contributor

vanzin commented Aug 5, 2019

Sounds good to me.

@SparkQA
Copy link

SparkQA commented Aug 7, 2019

Test build #108748 has finished for PR 25299 at commit 7dded2c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2019

Test build #108769 has finished for PR 25299 at commit 50bca42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2019

Test build #108896 has finished for PR 25299 at commit c51f107.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 9, 2019

just FYI: I remember there was the similar work for this improvement a few years ago: SPARK-6521(#9478). We closed the ticket cuz we didn't get actual performance gains. Anyway, I added a link to it in the jira.

@attilapiros
Copy link
Contributor Author

@maropu I think the performance gains would be significant if the block size is over "spark.maxRemoteBlockSizeFetchToMem" (which default is 2GB), then without this change the shuffle block during fetching:

  1. would be read from disk
  2. sent via the network
  3. streamed to disk
  4. and finally re-read from disk when it is used.

With this change it would be just read from disk directly.

@attilapiros
Copy link
Contributor Author

jenkins retest this please

@attilapiros
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Nov 22, 2019

Test build #114310 has finished for PR 25299 at commit 52037b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

I only skimmed this but overall looks good, like this approach.

When fetching the host local blocks do we have any limits on # that can happen concurrently? I didn't see anything config on just skimming it. If we have a lot of other executors on the same node we end up bypassing the remote fetch configs as to # in parallel so now we are doing more work at once here which could potentially cause more local disk activity on both the read and write sides.

@attilapiros
Copy link
Contributor Author

attilapiros commented Nov 25, 2019

@tgravescs The block content reading will occur at the client side of the ShuffleBlockFetchIterator as the blocks are wrapped into FileSegmentManagedBuffer.
Over and above on one specific executor the host local blocks with cached executor directories are handled synchronously in one single thread (just like local blocks) and host local blocks for which directories are not cached are handled on a separate but single thread (on the thread which used to get all the missing directories via one RPC call).

@tgravescs
Copy link
Contributor

ok that makes sense. thanks. So it seems we fetch all the local blocks in first and then the host local blocks and all that happens in the current thread while the remote block fetcher happens in separate.
My concern could go both ways then. Now we are having to wait to fetch the local blocks first before the host local blocks where as before that could have been in parallel with the local blocks. Or if we end up with a lot of host local blocks then within those fetches they happen serially now whereas they would have happened in parallel before.
Mostly just an observation to something we may want to enhance later if we see it as an issue.

@vanzin
Copy link
Contributor

vanzin commented Nov 25, 2019

Now we are having to wait to fetch the local blocks first before the host local blocks where as before that could have been in parallel with the local blocks.

"Host local" blocks are basically local blocks after you fill in the local dirs cache. And "fetch" is a misnomer in this case; you'll get a pointer to the data (which mostly involves metadata operations like finding the file, but not actually opening it), and it will only be actually opened when the task starts running. So not really any waiting involved.

Sounds like a gain to me - while before you'd have RPC + open on the remote side + transfer data over socket + cache it in memory in executor (or, worst, write it to disk again + open another file), now you only do an "open file".

@tgravescs
Copy link
Contributor

Definitely agree, sounds like gain and been wanting this for a while so thanks @attilapiros for working on it. And you're right I wasn't thinking about this not even opening the file so should be very fast.

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114429 has finished for PR 25299 at commit 22aa383.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

jenkins retest this please

@tgravescs
Copy link
Contributor

test this please

@attilapiros
Copy link
Contributor Author

attilapiros commented Nov 26, 2019

The error is:

[ERROR] Failed to execute goal on project spark-catalyst_2.12: Could not resolve dependencies for project org.apache.spark:spark-catalyst_2.12:jar:3.0.0-SNAPSHOT: The following artifacts could not be resolved: org.codehaus.janino:commons-compiler:jar:3.0.15, com.univocity:univocity-parsers:jar:2.8.3, org.apache.arrow:arrow-vector:jar:0.15.1: Could not transfer artifact org.codehaus.janino:commons-compiler:jar:3.0.15 from/to central (https://repo.maven.apache.org/maven2): Connection timed out (Read failed) -> [Help 1]

Meanwhile this link is working: janino 3.0.15.

Could it be the local maven repo in the Jenkins which runs the PR builder should be purged?

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114464 has finished for PR 25299 at commit da93837.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114468 has finished for PR 25299 at commit da93837.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 26, 2019

Test build #114470 has finished for PR 25299 at commit da93837.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Nov 26, 2019

Merging to master.

@vanzin vanzin closed this in fd2bf55 Nov 26, 2019
attilapiros added a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
… from the same host

## What changes were proposed in this pull request?

Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.

Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager`).

Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly.

## How was this patch tested?

### Unit tests

`ExternalShuffleServiceSuite`:
- "SPARK-27651: host local disk reading avoids external shuffle service on the same node"

`ShuffleBlockFetcherIteratorSuite`:
- "successful 3 local reads + 4 host local reads + 2 remote reads"

And with extending existing suites where shuffle metrics was tested.

### Manual tests

Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.

```
$ grep host-local experiment.log
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms
```

Closes apache#25299 from attilapiros/SPARK-27651.

Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
.createWithDefault(false)

private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
ConfigBuilder("spark.shuffle.readHostLocalDisk.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros @cloud-fan @squito @tgravescs @vanzin,
I think we need to mention in migration guide that this option must be disabled for old shuffle service.

Copy link
Contributor Author

@attilapiros attilapiros Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about disabling this feature when spark.shuffle.useOldFetchProtocol is true (and updating the error message along with the corresponding docs; see below)?

As spark.shuffle.useOldFetchProtocol is already exists and documented in one of the migration guide (although I do not know why it is in sql-migration-guide.md).

https://github.com/apache/spark/blob/master/docs/sql-migration-guide.md:

Since Spark 3.0, we use a new protocol for fetching shuffle blocks, for external shuffle service users, we need to upgrade the server correspondingly. Otherwise, we'll get the error message UnsupportedOperationException: Unexpected message: FetchShuffleBlocks. If it is hard to upgrade the shuffle service right now, you can still use the old protocol by setting spark.shuffle.useOldFetchProtocol to true.

I have created a new jira for this: https://issues.apache.org/jira/browse/SPARK-30235

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new PR is #26869.

dongjoon-hyun pushed a commit that referenced this pull request Sep 2, 2020
…rnal shuffle service is disabled

### What changes were proposed in this pull request?

This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled.

Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors.

To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple.

### Why are the changes needed?

After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled.

### Does this PR introduce _any_ user-facing change?

Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement.

### How was this patch tested?

Added test and tested manually.

Closes #28911 from Ngone51/support_node_local_shuffle.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…anager fetches shuffle blocks from the same host

    [SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host

    ## What changes were proposed in this pull request?

    Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-
local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.

    Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering
this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManage
rMasterEndpoint#removeBlockManager`).

    Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the
 files behind the blocks directly.

    ## How was this patch tested?

    ### Unit tests

    `ExternalShuffleServiceSuite`:
    - "SPARK-27651: host local disk reading avoids external shuffle service on the same node"

    `ShuffleBlockFetcherIteratorSuite`:
    - "successful 3 local reads + 4 host local reads + 2 remote reads"

    And with extending existing suites where shuffle metrics was tested.

    ### Manual tests

    Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.

    ```
    $ grep host-local experiment.log
    19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
    19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
    19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
    19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
    19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
    19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms
    ```

    Closes apache#25299 from attilapiros/SPARK-27651.

    Authored-by: “attilapiros” <[email protected]>
    Signed-off-by: Marcelo Vanzin <[email protected]>

RB=2244102
BUG=LIHADOOP-55020
G=spark-reviewers
R=chsingh,mshen,vsowrira
A=chsingh
wangyum pushed a commit that referenced this pull request May 26, 2023
…rnal shuffle service is disabled

This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled.

Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors.

To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple.

After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled.

Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement.

Added test and tested manually.

Closes #28911 from Ngone51/support_node_local_shuffle.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants