diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 4d8ba9b3e4e0a..adeb507941c0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -838,9 +838,11 @@ private[spark] class BlockStatusPerBlockId { } def remove(blockId: BlockId): Unit = { - blocks.remove(blockId) - if (blocks.isEmpty) { - blocks = null + if (blocks != null) { + blocks.remove(blockId) + if (blocks.isEmpty) { + blocks = null + } } } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index dd3d90f3124d5..1ca78d572c7ad 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -255,4 +255,26 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi } } } + + test("SPARK-38640: memory only blocks can unpersist using shuffle service cache fetching") { + for (enabled <- Seq(true, false)) { + val confWithRddFetch = + conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, enabled) + sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetch) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) + try { + val rdd = sc.parallelize(0 until 100, 2) + .map { i => (i, 1) } + .persist(StorageLevel.MEMORY_ONLY) + + rdd.count() + rdd.unpersist(true) + assert(sc.persistentRdds.isEmpty) + } finally { + rpcHandler.applicationRemoved(sc.conf.getAppId, true) + sc.stop() + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala index f0c19c5ccce12..85f012aece3b4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala @@ -63,6 +63,8 @@ class BlockManagerInfoSuite extends SparkFunSuite { if (svcEnabled) { assert(getEssBlockStatus(bmInfo, rddId).isEmpty) } + bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 0) + assert(bmInfo.remainingMem === 30000) } testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled, bmInfo) =>