Skip to content
43 changes: 26 additions & 17 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1991,23 +1991,32 @@ private[spark] class BlockManager(
* lock on the block.
*/
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
val blockStatus = if (tellMaster) {
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
Some(getCurrentBlockStatus(blockId, blockInfo))
} else None

// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}

blockInfoManager.removeBlock(blockId)
if (tellMaster) {
// Only update storage level from the captured block status before deleting, so that
// memory size and disk size are being kept for calculating delta.
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
var hasRemoveBlock = false
try {
val blockStatus = if (tellMaster) {
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
Some(getCurrentBlockStatus(blockId, blockInfo))
} else None

// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}

blockInfoManager.removeBlock(blockId)
hasRemoveBlock = true
if (tellMaster) {
// Only update storage level from the captured block status before deleting, so that
// memory size and disk size are being kept for calculating delta.
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
}
} finally {
if (!hasRemoveBlock) {
logWarning(s"Block $blockId was not removed normally.")
blockInfoManager.removeBlock(blockId)
}
}
}

Expand Down