diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7d068fd69529..80dd1cb0be1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -276,8 +276,13 @@ class BlockManagerMasterEndpoint( val blockIdsToDel = blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService, new mutable.HashSet[RDDBlockId]()) blockIdsToDel += blockId - blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatus => - blockStatus.remove(blockId) + blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatusForId => + blockStatusForId.remove(blockId) + // when all blocks are removed from the block statuses then for this BM Id the whole + // blockStatusByShuffleService entry can be removed to avoid leaking memory + if (blockStatusForId.isEmpty) { + blockStatusByShuffleService.remove(bmIdForShuffleService) + } } } } @@ -309,6 +314,7 @@ class BlockManagerMasterEndpoint( Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures) } + private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) @@ -665,7 +671,7 @@ class BlockManagerMasterEndpoint( val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { - Option(blockStatusByShuffleService(bmId).get(blockId)) + blockStatusByShuffleService.get(bmId).flatMap(m => Option(m.get(blockId))) } else { aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) }