-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors #35085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors #35085
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure whether I should make this backward compatible or not, so I added a default implementation. I can remove it if it doesn't need to be backward compatible (which it's marked private so probably doesn't need to be?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your assessment @Kimahriman and would prefer to avoid this methough, though @Ngone51 had expressed concern in the past that some of the private classes might be getting used for some custom implementations.
Thoughts @Ngone51 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be +1 for keeping backward compatible if you want to keep this method.
|
Can one of the admins verify this patch? |
|
+CC @otterc Can you take a look at this PR ? It currently does not handle push based shuffle - would be great if you can suggest changes to accommodate that as well. |
|
Had a Q, @HyukjinKwon, @dongjoon-hyun - why is AmplabJenkins still commenting ? I thought we had turned jenkins off ? |
Yeah thought about that. Haven't used push based shuffle yet or fully know how it works so mostly didn't want to break it. I can look into that here with some hints (look at mergeStasus in addition to mapStatus?). Or that can be a separate follow up. |
|
@mridulm I have no context for that either~ :) |
|
I have no idea too :-). I think the machines are still running assuming that it still links JIRA <> PRs. I remember that program is running separately in one of the Jenkins machines. So, i suspect that Jenkins builds are somehow shut down but the machines are still alive .. we can ignore it for now I think. |
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
I think the same mechanism can be used for push-based shuffle. We can look at the |
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
167f90b to
b761a84
Compare
|
I tested this out in my actual environment and realized that it didn't actually work because the context cleaner remove the map output before removing the shuffle from the block manager master. I switched the order of those and updated to test to user the cleaner to replicate and it sort of started working. I say sort of because I realized a major potential issue with this. We use a private secured Hadoop cluster and the shuffle files were not writable by the user running the shuffle service so they couldn't be deleted. I still need to do a little digging to see if the permissions for those files (via Spark or Hadoop) are configurable at all through the node managers. |
|
I added a utility function to create the shuffle files as group writable. I haven't tried this out in my environment yet but I think it'd be necessary for this to work (in a secured environment). If there are security concerns with this I can bring it up in the dev mailing list. Also don't know if this will break or be ignored on non-posix compliant systems, so definitely want to get some thoughts on this |
5d69e14 to
9551baa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick pass through the PR, thanks for working on this @Kimahriman !
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
| val perms = PosixFilePermissions.fromString("rw-rw----") | ||
| val path = file.toPath | ||
| Files.createFile(path) | ||
| Files.setPosixFilePermissions(path, perms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @otterc There were some concerns with PosixFilePermissions which resulted in addition of DiskBlockManager.createDirWithPermission770
Will this be affected by the same ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I saw that and wasn't sure what the background was that didn't work in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comment
* TODO: Find out why can't we create a dir using java api with permission 770
* Files.createDirectories(mergeDir.toPath, PosixFilePermissions.asFileAttribute(
* PosixFilePermissions.fromString("rwxrwx---")))
I'm thinking it might be a umask issue? The permissions you give it during dir creation will be affected by the umask, and the way around that is to create the dir and then change its permissions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this was not a umask issue (I had asked similar question during review :-) ).
Will let @otterc comment more though - she has better context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I figured it out while testing this out in our env. Java has no way to set or keep the setgid bit when setting file permissions, so you lose the setgid bit with any of these methods of setting file permissions. I'm trying to figure out how yarn creates these folders since it manages to keep the bit, but it looks like it's doing it all through a local FileContext, which seems to be not keeping the setgid bit when making directories, so I'm not sure how that works.
Will keep investigating but will probably use the existing method to create the 770 folder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, it worked when I tried for the test case and when I tested it on my environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm I think even @zhouyejoe tried that in our environment. @zhouyejoe do you remember whether that worked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will give it a try again. Maybe made some mistake. Can't find any reason for it to not work. If it works, then we can clean it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking into this in our environment and was able to get the below piece of code working:
if (dirToCreate.mkdirs() && dirToCreate.exists()) { Files.setPosixFilePermissions(dirToCreate.toPath, PosixFilePermissions.fromString("rwxrwx---")) created = dirToCreate }
@Kimahriman do you mind testing this piece of code in your environment? It seems like the code referenced in the TODO does not pass the unit tests and also does not work in our environment.
If this looks good we can clean it up.
cc: @zhouyejoe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's the umask issue. The todo does permission setting in the dir creation. If you setPosixFilePermissions after creating the directory you don't get limited by the umask (only weird behavior with the setgid bit mentioned in this thread). I've tried this approach in my env as well and it works in place of the createDirWith770 method
9111803 to
4bb2f75
Compare
|
I tried adding a |
|
I guess another way that could work too is create the sub dirs with 770 using |
|
Created https://issues.apache.org/jira/browse/SPARK-38005 for push-based shuffle |
|
I went with the make subdirs 770 and make files world readable approach, seemed a lot less hacky than relying on ACLs. Will update the description with the approach |
6e9955d to
2ff03a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @Kimahriman !
This is very useful, particularly in environments with dynamic resource allocation turned on.
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your assessment @Kimahriman and would prefer to avoid this methough, though @Ngone51 had expressed concern in the past that some of the private classes might be getting used for some custom implementations.
Thoughts @Ngone51 ?
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add GROUP_READ and GROUP_EXECUTE (for listing) as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was it should already have everything else it needs (because it needs to to function), so change as little as possible and just add group write. I could just update to fully set as 770 instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Files.createDirectory, using DiskBlockManager.createDirWithPermission770 (or move it to utils) should suffice - and we wont need to change the perms.
It is unfortunate that jdk still has not fixed this bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, createDirWithPermission770 was made because it was thought it couldn't be done through the Java API, but this works in my environment through the Java API correctly. See later in that same thread for that discussion: #35085 (comment)
Because it might be possible to remove that helper after all (still waiting to hear if there's some other reason create then set permission won't work in other environments), I just stuck with the nio API.
From what I've found, losing the setgid bit isn't actually Java related. mkdir -p -m770 also removes the setgid bit because of the way the chmod Linux system call works (in the operating systems I tested at least). Despite the man page saying that it preservers the setgid bit on directories, the same behavior that the man page lists for files seems to apply to directories. If the user isn't in the group of the directory, it will remove the setgid bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I can't recreate mkdir losing the setgid bit part...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah definitely not ideal, but the permissions of the parent folders should prevent this from actually opening up any security holes I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN has a deletion service that it uses to remove the files normally (as the user), I started to look at this but I'm not sure the shuffle manager can get a handle to it.
If we are going to take this approach, I want to only change permissions when this feature is enabled. Can we limit it to just when creating the shuffle files?
Ideally I would also like it documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I hoped there was a way to hook into the yarn run as user in the shuffle service but I couldn't see anyway to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put all the permissions changes inside checks for the feature being enabled. I also removed the permission change from the temp file creation, I think the actual shuffle files only go through the createTempFileWith
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just kidding not actually true, need to trace through and figure out but some final shuffle blocks must be created with createTempShuffleBlock
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Outdated
Show resolved
Hide resolved
|
it would be nice to update description to say when we remove them. Is there a config to disable this functionality, didn't see one? I think the removeBlocks was added in 3.0 so probably fairly safe for compatibility but it might be nice to have it behind a feature flag just in case someone has weird shuffle service or there is just issues with the feature (unless we are really confident in it). open to thoughts on this.. |
|
thinking a bit more about this, I would like to see it behind a feature flag. how long does asking for the removal take? I assume its pretty small but would be good to measure. It does looks like waiting on removal is done in background by default (CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE) so won't block application side. On very busy external shuffle service this is just going to put more pressure on node manager as well as potential disk load. |
|
Feature flag defaulting on or off? |
|
depends on how confident we are in it? have you done any testing on real cluster at scale? |
|
Yeah i've been using it on our production cluster (hundreds of nodes). When there were permission issues it failed pretty quickly. Haven't actually seen how long it takes but it's been removing ~10+ TB shuffles fine. Can do a little closer monitoring |
|
seems fine to have on by default then. |
|
The second bullet is introduced by this PR, but since it's only a bug if you enable this new functionality is that also ok to be part of the separate issue? |
|
sorry, I read that too quickly, yes we should fix the RDD fetch under this as we should not break one feature for another. The only part that I guess we could leave off is updating the folder permissions for RDD fetch only. |
|
Yeah basically. It's kinda like:
|
…led and add permission checks to tests
|
Added the update for RDD blocks and checking that setting, and added a few more checks to the tests as well |
|
Found an separate issue with the RDD fetching while testing this ^ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple doc requests
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Outdated
Show resolved
Hide resolved
| new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] | ||
| if (externalShuffleServiceRemoveShuffleEnabled) { | ||
| mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => | ||
| shuffleStatus.mapStatuses.foreach { mapStatus => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use shufflerStatus.withMapStatuses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, done
…or released executors ### What changes were proposed in this pull request? Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files. Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. ### Why are the changes needed? External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford <[email protected]> Signed-off-by: Thomas Graves <[email protected]> (cherry picked from commit 9a7596e) Signed-off-by: Thomas Graves <[email protected]>
|
Merged to master and branch-3.3. @Kimahriman if you want this on by default in next release please file a separate issue to discuss it. |
|
Sounds good! |
…or released executors Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files. Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. No. New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes apache#35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford <[email protected]> Signed-off-by: Thomas Graves <[email protected]> (cherry picked from commit 9a7596e) Signed-off-by: Thomas Graves <[email protected]>
…g the shuffle service for released executors (#1041) * [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files. Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. No. New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford <[email protected]> Signed-off-by: Thomas Graves <[email protected]> * [SPARK-37618][CORE][FOLLOWUP] Support cleaning up shuffle blocks from external shuffle service Fix test failure in build. Depending on the umask of the process running tests (which is typically inherited from the user's default umask), the group writable bit for the files/directories could be set or unset. The test was assuming that by default the umask will be restrictive (and so files/directories wont be group writable). Since this is not a valid assumption, we use jnr to change the umask of the process to be more restrictive - so that the test can validate the behavior change - and reset it back once the test is done. Fix test failure in build No Adds jnr as a test scoped dependency, which does not bring in any other new dependency (asm is already a dep in spark). ``` [INFO] +- com.github.jnr:jnr-posix:jar:3.0.9:test [INFO] | +- com.github.jnr:jnr-ffi:jar:2.0.1:test [INFO] | | +- com.github.jnr:jffi:jar:1.2.7:test [INFO] | | +- com.github.jnr:jffi:jar:native:1.2.7:test [INFO] | | +- org.ow2.asm:asm:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-commons:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-analysis:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-tree:jar:5.0.3:test [INFO] | | +- org.ow2.asm:asm-util:jar:5.0.3:test [INFO] | | \- com.github.jnr:jnr-x86asm:jar:1.0.2:test [INFO] | \- com.github.jnr:jnr-constants:jar:0.8.6:test ``` Modification to existing test. Tested on Linux, skips test when native posix env is not found. Closes #36473 from mridulm/fix-SPARK-37618-test. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: Sean Owen <[email protected]> * Fix ut failure Co-authored-by: Adam Binford <[email protected]> Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
…efault ### What changes were proposed in this pull request? This PR aims to enable `spark.shuffle.service.removeShuffle` for Apache Spark 4.0.0. ### Why are the changes needed? Since Apache Spark 3.3.0, Apache Spark has been supporting `spark.shuffle.service.removeShuffle` via SPARK-37618. - #35085 We can use it when external shuffle service is available. ### Does this PR introduce _any_ user-facing change? By default, no because `spark.shuffle.service.enabled` is still disabled. Only for the existing shuffle service users, this PR works. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45572 from dongjoon-hyun/SPARK-47448. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…efault ### What changes were proposed in this pull request? This PR aims to enable `spark.shuffle.service.removeShuffle` for Apache Spark 4.0.0. ### Why are the changes needed? Since Apache Spark 3.3.0, Apache Spark has been supporting `spark.shuffle.service.removeShuffle` via SPARK-37618. - apache#35085 We can use it when external shuffle service is available. ### Does this PR introduce _any_ user-facing change? By default, no because `spark.shuffle.service.enabled` is still disabled. Only for the existing shuffle service users, this PR works. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#45572 from dongjoon-hyun/SPARK-47448. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
Do we need to upgrade shuffle service to enable this feature? |
|
Which version are you using now? It is only available at or after 3.3.0 |
Spark 3.5.0, but Spark shuffle service is Spark 3.0.0. |
I think it might work with a 3.0.0 shuffle service. That's when #24499 was added which includes the deletion updates on the shuffle service this relies on. |
What changes were proposed in this pull request?
Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself.
To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically:
rw-r-----, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files.Both of these changes are done after creating a file so that umasks don't affect the resulting permissions.
Why are the changes needed?
External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add.