-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35543][CORE] Fix memory leak in BlockManagerMasterEndpoint removeRdd #32790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR blockStatusByShuffleService.get(bmId) can be None and even when it has some value for the key then even m.get(blockId) can be null as m is a Java HashMap.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
In the github action I can see two unrelated errors: |
|
Test build #139370 has finished for PR 32790 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
I think I just used an outdated master branch in my local clone. |
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139371 has finished for PR 32790 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139375 has finished for PR 32790 at commit
|
|
Outdated comment:
|
|
Actually the I have added some temporary logging and check the |
|
Do we need to also handle |
|
@mridulm the It uses |
|
You probably meant he |
It doesn't exist currently. I'm proposing a safer way actually. And I think it's more readable. Looking at |
|
Kubernetes integration test status failure |
|
Test build #139390 has finished for PR 32790 at commit
|
@Ngone51 I am not so sure about that. 1.)
Why are you sure about that? Just looking only these lines? spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 906 to 914 in 6de20f1
Do we?
Related to both points:
I am sorry but now I have a limited time for this (starting tomorrow I will be without my laptop for the next 10 days) and within the scope "Fix memory leak in BlockManagerMasterEndpoint removeRdd" I intend to fix the existing / proved memory leaks and avoid introducing any new errors or performance degradation by being not careful enough. Moreover I do not want to leave any potential error for the others (unattended by me). With my phone I cannot fix it (although a vim probably would run just fine on it ;) but having no keyboard no sbt..) so it would disturb me a lot and it could ruin my vacation what I do not want risk. |
|
K8s failures are unrelated: #32790 (comment) |
It's only called by
Sure, no worries. Your current fix already resolves the issue so it's ok to merge. We don't have to block on my suggestion. We can discuss more when you're available. |
|
Ok, l see there's another type of block - |
|
Thank you, @attilapiros and all. Could you backport this, @attilapiros ? |
|
@holdenk @dongjoon-hyun @Ngone51 |
|
@dongjoon-hyun I am sorry I have to run now and I thought during the merge this minor leak won't worth it. |
I was referring to this, yes - essentially when all the block id's are removed - we end up with an entry in Essentially, this is similar as |
|
@mridulm sorry I cannot focus on this right now. |
|
@attilapiros No hurries, enjoy your break :-) |
|
No problem. Thank you, @attilapiros ! Enjoy your holiday. |
|
I think I made a mistake here and we should revert this change. But let me share my thoughts and discuss the problem. So we have spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 64 to 66 in 6de20f1
The spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 570 to 580 in 6de20f1
When the feature is on and this is the first block manager on a node a new Although spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 823 to 827 in 6de20f1
So what happens when we add an disk persisted RDD and remove it latter then add a new one then lost the Block Manager. In that case spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 672 to 675 in 6de20f1
There is no lifecycle management for nodes but for Block Managers (executors) so we cannot know when we get a new BM using the earlier node. We could remove the map and when a new block is persisted recreate the map then re-register it to every block infos related to BMs running on the node but does this worth the work? In addition to check every time whether the map exists and the feature is on when we would like to add an entry? This has run time cost and I do not think it worth it compared to the memory footprint of this "leak" which is proportional to the number of nodes. |
|
It's fine to revert this because this is not released yet. Can we have a test case, @attilapiros ? BTW, cc @gengliangwang since he is the release manager for Apache Spark 3.2.0. |
|
@dongjoon-hyun it is hard to test this as |
|
My main concern was around long running applications which cycle through a lot of executors (over time - not concurrently). |
|
@mridulm could we estimate the number of nodes for those long running applications? |
|
(Ignoring ephemenral nodes, dockerized deployments, etc - and using more real world scenarios) For a 5k cluster, the map can have upto 5k entries - with the values maps going of reasonable size (even if empty). |
The feature (Serve local disk persisted blocks by the external service) only works when external shuffle service is available.
I do not think the number of blocks matters at all. So spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 64 to 66 in 6de20f1
It maps a special spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 788 to 790 in 37ef7bb
So the number is ~ 5000 * 2 * (~8 bytes) = 80KB = 0.08MB PS: I have adapted your calculation although I believe an empty hashmap is not 8 bytes but still some low constant number and I do not get why the 2 multiplier used as we should talk about only the external shuffle service map (the other map is for each executors but its lifecycle is handled correctly and when they removed the |
|
I am referring to the size of the Map's themselves @attilapiros. (Edit): |
|
Your example assumes all the blocks was persisted into disk. In that case when during the run of the application when persist is called again those area will be reused. Even though I have an idea: introducing a new class which wraps the |
|
@mridulm Interesting example! I also investigate a bit on it. Looks like all the elements of the underlying array has been nulled out but remain the array refrence unchanged. So its size doesn't change. But the memory usage shouldn't be the same as before but it's also not empty since public void clear() {
Node<K,V>[] tab;
modCount++;
if ((tab = table) != null && size > 0) {
size = 0;
for (int i = 0; i < tab.length; ++i)
tab[i] = null;
}
} |
|
@Ngone51 Yes, the table contents are null'ed, but the table itself remains and occupies memory. |
…dpoint removeRdd ### What changes were proposed in this pull request? Wrapping `JHashMap[BlockId, BlockStatus]` (used in `blockStatusByShuffleService`) into a new class `BlockStatusPerBlockId` which removes the reference to the map when all the persisted blocks are removed. ### Why are the changes needed? With #32790 a bug is introduced when all the persisted blocks are removed we remove the HashMap which already shared by the block manger infos but when new block is persisted this map is needed to be used again for storing the data (and this HashMap must be the same which shared by the block manger infos created for registered block managers running on the same host where the external shuffle service is). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Extending `BlockManagerInfoSuite` with test which removes all the persisted blocks then adds another one. Closes #33020 from attilapiros/SPARK-35543-2. Authored-by: attilapiros <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
In
BlockManagerMasterEndpointfor the disk persisted RDDs (whenspark.shuffle.service.fetch.rdd.enableis enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is theblockStatusByShuffleServicemember val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed fromblockStatusByShuffleService.Why are the changes needed?
It is a small leak and I was asked to take care of it in #32114 (comment).
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually by adding a temporary log line to check
blockStatusByShuffleServicevalue before and after theremoveRddand run theSPARK-25888: using external shuffle service fetching disk persisted blockstest inExternalShuffleServiceSuite.