Skip to content

Conversation

@attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Jun 5, 2021

What changes were proposed in this pull request?

In BlockManagerMasterEndpoint for the disk persisted RDDs (when spark.shuffle.service.fetch.rdd.enable is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the blockStatusByShuffleService member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from blockStatusByShuffleService.

Why are the changes needed?

It is a small leak and I was asked to take care of it in #32114 (comment).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually by adding a temporary log line to check blockStatusByShuffleService value before and after the removeRdd and run the SPARK-25888: using external shuffle service fetching disk persisted blocks test in ExternalShuffleServiceSuite.

@github-actions github-actions bot added the CORE label Jun 5, 2021
Copy link
Contributor Author

@attilapiros attilapiros Jun 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR blockStatusByShuffleService.get(bmId) can be None and even when it has some value for the key then even m.get(blockId) can be null as m is a Java HashMap.

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43892/

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 5, 2021

In the github action I can see two unrelated errors:

[info] - Non-consecutive stage failures don't trigger abort *** FAILED *** (635 milliseconds)
[info]   stageAttemptOpt.isDefined was false (DAGSchedulerSuite.scala:978)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.scheduler.DAGSchedulerSuite.completeShuffleMapStageSuccessfully(DAGSchedulerSuite.scala:978)
[info]   at org.apache.spark.scheduler.DAGSchedulerSuite.$anonfun$new$59(DAGSchedulerSuite.scala:1211)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:62)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
Moving to python/docs directory and building sphinx.
Running Sphinx v3.0.4

Extension error:
Could not import extension sphinx_plotly_directive (exception: No module named 'sphinx_plotly_directive')
make: *** [Makefile:20: html] Error 2
                    ------------------------------------------------
      Jekyll 4.2.0   Please append `--trace` to the `build` command 
                     for any additional information or backtrace. 
                    ------------------------------------------------
/__w/spark/spark/docs/_plugins/copy_api_dirs.rb:130:in `<top (required)>': Python doc generation failed (RuntimeError)
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.0/lib/jekyll/external.rb:60:in `require'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.0/lib/jekyll/external.rb:60:in `block in require_with_graceful_fail'

@github-actions github-actions bot added the PYTHON label Jun 5, 2021
@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Test build #139370 has finished for PR 32790 at commit 6c7d096.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43893/

@attilapiros
Copy link
Contributor Author

I think I just used an outdated master branch in my local clone.
So I rebased my changed on the top of fresh master and reverted those Sphinx related commits of mine.

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43894/

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43895/

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43894/

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Test build #139371 has finished for PR 32790 at commit d321c72.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43896/

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43896/

@apache apache deleted a comment from SparkQA Jun 5, 2021
@apache apache deleted a comment from SparkQA Jun 5, 2021
@apache apache deleted a comment from SparkQA Jun 5, 2021
@attilapiros
Copy link
Contributor Author

cc @Ngone51 @mridulm

@SparkQA
Copy link

SparkQA commented Jun 5, 2021

Test build #139375 has finished for PR 32790 at commit 7be0ce4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 5, 2021

Outdated comment:

I just realized existing tests does not covers this so please let me run at least a manual test before the merge

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 5, 2021

Actually the "SPARK-25888: using external shuffle service fetching disk persisted blocks test in ExternalShuffleServiceSuite covers this case.

I have added some temporary logging and check the blockStatusByShuffleService:

21/06/06 00:54:05.184 dispatcher-BlockManagerMaster ERROR BlockManagerMasterEndpoint: Attila Map(BlockManagerId(0, 192.168.1.210, 57357, None) -> {rdd_1_0=BlockStatus(StorageLevel(disk, 1 replicas),0,922)}, BlockManagerId(driver, apiros-mbp16.lan, 57357, None) -> {})
21/06/06 00:54:05.184 dispatcher-BlockManagerMaster ERROR BlockManagerMasterEndpoint: Attila Map(BlockManagerId(driver, apiros-mbp16.lan, 57357, None) -> {})

@mridulm
Copy link
Contributor

mridulm commented Jun 6, 2021

Do we need to also handle updateBlockInfo @attilapiros ?

@attilapiros
Copy link
Contributor Author

@mridulm the updateBlockInfo does not use blockStatusByShuffleService. Isn't it?

It uses blockLocation which is another data structure where for each block ID we are storing the block manager IDs where the block is available.

@attilapiros
Copy link
Contributor Author

You probably meant he updateBlockInfo of the BlockManagerInfo. Let me check that!

@Ngone51
Copy link
Member

Ngone51 commented Jun 7, 2021

@Ngone51 currently I cannot see a leak here but I am open to change this if it is proven to be exists.

It doesn't exist currently. I'm proposing a safer way actually.

And I think it's more readable. Looking at if (!blockId.isBroadcast && blockStatus.diskSize > 0) makes me think it handle all block types excepts the broadcast block. However, looking at removeBlock, it shows it only cleans rdd blocks indeed. So it makes me think whether we missed other block types. Fortunately, we only have broadcast block and rdd block here so it won't leak anything. But the current situation might be broken if we added a new block in the future and forget to update accordingly.

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43912/

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Test build #139390 has finished for PR 32790 at commit 6de20f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 7, 2021

However, looking at removeBlock, it shows it only cleans rdd blocks indeed. So it makes me think whether we missed other block types. Fortunately, we only have broadcast block and rdd block here so it won't leak anything.

@Ngone51 I am not so sure about that.

1.)

However, looking at removeBlock, it shows it only cleans rdd blocks indeed.

Why are you sure about that? Just looking only these lines?

def removeBlock(blockId: BlockId): Unit = {
if (_blocks.containsKey(blockId)) {
_remainingMem += _blocks.get(blockId).memSize
_blocks.remove(blockId)
externalShuffleServiceBlockStatus.foreach { blockStatus =>
blockStatus.remove(blockId)
}
}
}

Fortunately, we only have broadcast block and rdd block here so it won't leak anything.

Do we?

case class TaskResultBlockId(taskId: Long) extends BlockId {

Related to both points:

ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))

sparkEnv.blockManager.master.removeBlock(blockId)

I am sorry but now I have a limited time for this (starting tomorrow I will be without my laptop for the next 10 days) and within the scope "Fix memory leak in BlockManagerMasterEndpoint removeRdd" I intend to fix the existing / proved memory leaks and avoid introducing any new errors or performance degradation by being not careful enough. Moreover I do not want to leave any potential error for the others (unattended by me). With my phone I cannot fix it (although a vim probably would run just fine on it ;) but having no keyboard no sbt..) so it would disturb me a lot and it could ruin my vacation what I do not want risk.

@attilapiros
Copy link
Contributor Author

K8s failures are unrelated: #32790 (comment)

@Ngone51
Copy link
Member

Ngone51 commented Jun 7, 2021

Why are you sure about that? Just looking only these lines?

It's only called by BlockManagerMasterEndpoint.removeRdd() -> removeBlock() in production code path, right?

Do we?

First, I'm talking about the production code not testing code. (I misread the TaskResultBlockId as TestResultBlockId , I'm looking at this now.) I said we only have broadcast blocks and rdd blocks here because it comes from BlockManagerMasterEndpoint.updateBlockInfo -> updateBlockInfo. At there, shuffle blocks are excluded so only broadcast blocks and rdd blocks are handled.

With my phone I cannot fix it (although a vim probably would run just fine on it ;) but having no keyboard no sbt..) so it would disturb me a lot and it could ruin my vacation what I do not want risk.

Sure, no worries. Your current fix already resolves the issue so it's ok to merge. We don't have to block on my suggestion. We can discuss more when you're available.

@Ngone51
Copy link
Member

Ngone51 commented Jun 7, 2021

Ok, l see there's another type of block - TaskResultBlockId. So it looks good!!

@dongjoon-hyun
Copy link
Member

Thank you, @attilapiros and all. Could you backport this, @attilapiros ?

@attilapiros
Copy link
Contributor Author

@holdenk @dongjoon-hyun @Ngone51
Thanks for the reviews!

@attilapiros
Copy link
Contributor Author

@dongjoon-hyun I am sorry I have to run now and I thought during the merge this minor leak won't worth it.
If you think it is needed may I ask a favour to backport it when you have time?

@mridulm
Copy link
Contributor

mridulm commented Jun 7, 2021

@attilapiros:

That one is also negative. It uses an externalShuffleServiceBlockStatus: Option[JHashMap[BlockId, BlockStatus]] where entries are removed by the key:

I was referring to this, yes - essentially when all the block id's are removed - we end up with an entry in blockManagerInfo Map (which is referring to this hash map as the value) which does not have any entries (empty map).

Essentially, this is similar as removeRdd.
In BlockManagerMasterEndpoint.updateBlockInfo, after blockManagerInfo(blockManagerId).updateBlockInfo we can end up with an empty map as value.

@attilapiros
Copy link
Contributor Author

@mridulm sorry I cannot focus on this right now.
I create a reminder in my calendar to come back to this when I am back from my Holiday and about the backport too (cc @dongjoon-hyun).

@mridulm
Copy link
Contributor

mridulm commented Jun 7, 2021

@attilapiros No hurries, enjoy your break :-)

@dongjoon-hyun
Copy link
Member

No problem. Thank you, @attilapiros ! Enjoy your holiday.

@attilapiros
Copy link
Contributor Author

I think I made a mistake here and we should revert this change.

But let me share my thoughts and discuss the problem.

So we have blockStatusByShuffleService for keeping tracks of the block statuses for external shuffle services:

// Mapping from external shuffle service block manager id to the block statuses.
private val blockStatusByShuffleService =
new mutable.HashMap[BlockManagerId, JHashMap[BlockId, BlockStatus]]

The JHashMap[BlockId, BlockStatus] part is shared with the block infos when a new block manager is registered:

val externalShuffleServiceBlockStatus =
if (externalShuffleServiceRddFetchEnabled) {
val externalShuffleServiceBlocks = blockStatusByShuffleService
.getOrElseUpdate(externalShuffleServiceIdOnHost(id), new JHashMap[BlockId, BlockStatus])
Some(externalShuffleServiceBlocks)
} else {
None
}
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)

When the feature is on and this is the first block manager on a node a new JHashMap[BlockId, BlockStatus] is created.

Although removeRdd is not routed to the blockmanger infos but update of the block status is done in the method:

def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Unit = {

So what happens when we add an disk persisted RDD and remove it latter then add a new one then lost the Block Manager. In that case getLocationsAndStatus() gives back no block infos even this could be given back via the shuffle service. So this fails:

val status = locations.headOption.flatMap { bmId =>
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => Option(m.get(blockId)))
} else {

There is no lifecycle management for nodes but for Block Managers (executors) so we cannot know when we get a new BM using the earlier node.

We could remove the map and when a new block is persisted recreate the map then re-register it to every block infos related to BMs running on the node but does this worth the work? In addition to check every time whether the map exists and the feature is on when we would like to add an entry? This has run time cost and I do not think it worth it compared to the memory footprint of this "leak" which is proportional to the number of nodes.

@mridulm @Ngone51 WDYT?

@dongjoon-hyun
Copy link
Member

It's fine to revert this because this is not released yet. Can we have a test case, @attilapiros ?

BTW, cc @gengliangwang since he is the release manager for Apache Spark 3.2.0.

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 21, 2021

@dongjoon-hyun it is hard to test this as BlockManagerMasterEndpoint is not tested directly. So first I would revert this then we can think about how to cover these cases with tests.
But before the revert I am interested what the others think about this.

@mridulm
Copy link
Contributor

mridulm commented Jun 21, 2021

My main concern was around long running applications which cycle through a lot of executors (over time - not concurrently).
Will this not cause issues there eventually ?

@attilapiros
Copy link
Contributor Author

@mridulm could we estimate the number of nodes for those long running applications?
This is proportional to the number of nodes/hosts not with the number of executors.

@mridulm
Copy link
Contributor

mridulm commented Jun 21, 2021

(Ignoring ephemenral nodes, dockerized deployments, etc - and using more real world scenarios)

For a 5k cluster, the map can have upto 5k entries - with the values maps going of reasonable size (even if empty).
For example, assuming 1k blocks per node on average - we end up with the map using approximately: ~ 5000 * 2048 * (~8 bytes) == ~80MB : this is the driver cost with zero blocks being used.

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 22, 2021

(Ignoring ephemenral nodes, dockerized deployments, etc - and using more real world scenarios)

The feature (Serve local disk persisted blocks by the external service) only works when external shuffle service is available.

For example, assuming 1k blocks per node on average - we end up with the map using approximately: ~ 5000 * 2048 * (~8 bytes) == ~80MB : this is the driver cost with zero blocks being used.

I do not think the number of blocks matters at all. So blockStatusByShuffleService is the following data structure:

// Mapping from external shuffle service block manager id to the block statuses.
private val blockStatusByShuffleService =
new mutable.HashMap[BlockManagerId, JHashMap[BlockId, BlockStatus]]

It maps a special BlockManagerId instances which are created for the external shuffle service (so for the nodes) to a map which maps BlockId instances to a BlockStatus which is a simple case class:

case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
def isCached: Boolean = memSize + diskSize > 0
}

So the number is ~ 5000 * 2 * (~8 bytes) = 80KB = 0.08MB

PS: I have adapted your calculation although I believe an empty hashmap is not 8 bytes but still some low constant number and I do not get why the 2 multiplier used as we should talk about only the external shuffle service map (the other map is for each executors but its lifecycle is handled correctly and when they removed the BlockManagerInfo instances are cleaned up too).

@mridulm
Copy link
Contributor

mridulm commented Jun 22, 2021

I am referring to the size of the Map's themselves @attilapiros.
Essentially, sum of sizes of each value JHashMap : even though empty, java hashmap does not release memory - so end up with the same underlying array.

(Edit):
For example:

> val map = new java.util.HashMap[Int, Int]()
> for (i <- 0 until 1024 * 1024) map.put(i, i)
> val field = map.getClass.getDeclaredField("table")
> field.setAccessible(true)
> map.size
res1: Int = 1048576
> field.get(map).asInstanceOf[Array[AnyRef]].length
res2: Int = 2097152
> map.clear()
> map.size
res3: Int = 0
> field.get(map).asInstanceOf[Array[AnyRef]].length
res4: Int = 2097152

@attilapiros
Copy link
Contributor Author

Your example assumes all the blocks was persisted into disk. In that case when during the run of the application when persist is called again those area will be reused.

Even though I have an idea: introducing a new class which wraps the JHashMap and when the HashMap is empty we can create remove the reference to it and when it is needed again we can recreate the HashMap.
Let me look into that.

@Ngone51
Copy link
Member

Ngone51 commented Jun 23, 2021

@mridulm Interesting example! I also investigate a bit on it. Looks like all the elements of the underlying array has been nulled out but remain the array refrence unchanged. So its size doesn't change. But the memory usage shouldn't be the same as before but it's also not empty since null still takes a bit memory.

public void clear() {
    Node<K,V>[] tab;
    modCount++;
    if ((tab = table) != null && size > 0) {
        size = 0;
        for (int i = 0; i < tab.length; ++i)
            tab[i] = null;
    }
}

@mridulm
Copy link
Contributor

mridulm commented Jun 24, 2021

@Ngone51 Yes, the table contents are null'ed, but the table itself remains and occupies memory.
I used to use gnu trove4j (for example) to mitigate these sorts of issues (and to use primitive collection when relevant ofcourse !).

asfgit pushed a commit that referenced this pull request Jun 24, 2021
…dpoint removeRdd

### What changes were proposed in this pull request?

Wrapping `JHashMap[BlockId, BlockStatus]` (used in `blockStatusByShuffleService`) into a new class `BlockStatusPerBlockId` which removes the reference to the map when all the persisted blocks are removed.

### Why are the changes needed?

With #32790 a bug is introduced when all the persisted blocks are removed we remove the HashMap which already shared by the block manger infos but when new block is persisted this map is needed to be used again for storing the data (and this HashMap must be the same which shared by the block manger infos created for registered block managers running on the same host where the external shuffle service is).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Extending `BlockManagerInfoSuite` with test which removes all the persisted blocks then adds another one.

Closes #33020 from attilapiros/SPARK-35543-2.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants