-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27622][Core] Avoiding the network when block manager fetches disk persisted RDD blocks from the same host #24554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #105248 has finished for PR 24554 at commit
|
|
I can see one more possible improvement here: in |
|
Test build #105281 has finished for PR 24554 at commit
|
|
Jenkins retest this please |
|
Test build #105284 has finished for PR 24554 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good in general.
Have you tried this on a cluster? In particular w/ encryption. The code looks OK but worth an extra check.
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
Outdated
Show resolved
Hide resolved
|
actually ... now I'm wondering about the locking of blocks in That is true even when you read a remote block ... but that would look like the block went missing, and you'd try to get it from elsewhere, or compute from lineage. But now you'd probably have some weird failure in the middle of your task (I think). |
|
Thanks @squito! I will look into the problem regarding unpersist and file delete. On linux what I have found is already promising: https://linux.die.net/man/2/unlink :
Of course I will test this and come back to you with the result. Moreover as I know on windows a file cannot be deleted if it is opened. But I really have to check the details. I have not tried this PR on a cluster but I will do that too. |
|
Test build #105313 has finished for PR 24554 at commit
|
|
Test build #105312 has finished for PR 24554 at commit
|
|
oh that's a great point about an already open file, that is probably enough. it would be worth also taking a look at what happens if the file gets deleted before its open. I think things will just work correctly -- it should be just like the equivalent race that happens when reading a remote block. But would appreciate you taking a closer look at that part. |
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
|
In the previous commit "introduce getAndMapRemoteManagedBuf and open the block file early" the old
Both these usage ( The relevant tests: And with DEBUG mode the relevant output: |
|
I have added a new test to |
|
Test build #105841 has finished for PR 24554 at commit
|
|
Test build #105845 has finished for PR 24554 at commit
|
|
I have run the test on |
|
Test build #105867 has finished for PR 24554 at commit
|
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskReader.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #106065 has finished for PR 24554 at commit
|
|
Test build #106066 has finished for PR 24554 at commit
|
|
test failures look real |
|
Test build #106073 has finished for PR 24554 at commit
|
|
|
Jenkins retest this please |
1 similar comment
|
Jenkins retest this please |
|
Test build #106102 has finished for PR 24554 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of minor things but want to read this again. Maybe Imran will get to review it before going on vacation...
|
looks good to me other than the comments by marcelo. (some of my prior comments on blockmanagersuite were prematurely resolved, but talked to attila and he will address them in next patch) |
|
Test build #106165 has finished for PR 24554 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor things.
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #106280 has finished for PR 24554 at commit
|
|
retest this please |
|
Test build #106849 has finished for PR 24554 at commit
|
|
Merging to master. |
|
Catching up on PR's ... this essentially means all executors on same host have effectively same preferred locality (modulo concurrent block removal) - did we update the preferred locality for the block with this mind ? (here or in a follow up PR) Thx. |
What changes were proposed in this pull request?
Before this PR during fetching a disk persisted RDD block the network was always used to get the requested block content even when both the source and fetcher executor was running on the same host.
The idea to access another executor local disk files by directly reading the disk comes from the external shuffle service where the local dirs are stored for each executor (block manager).
To make this possible the following changes are done:
RegisterBlockManagermessage is extended with thelocalDirswhich is stored by the block manager master for each block manager as a new property of theBlockManagerInfoGetLocationsAndStatusis extended with the requester hostBlockLocationsAndStatus(the reply forGetLocationsAndStatusmessage) is extended with the an option of local directories, which is filled with a local directories of a same host executor (if there is any, otherwise None is used). This is where the block content can be read from.Shuffle blocks are out of scope of this PR: there will be a separate PR opened for that (for another Jira issue).
How was this patch tested?
With a new unit test in
BlockManagerSuite. See the the test prefixed by "SPARK-27622: avoid the network when block requested from same host".