Skip to content

Commit b95a771

Browse files
cxzl25Mridul
authored andcommitted
[SPARK-40987][CORE] BlockManager#removeBlockInternal should ensure the lock is unlocked gracefully
### What changes were proposed in this pull request? `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully. `removeBlockInternal` tries to call `removeBlock` in the finally block. ### Why are the changes needed? When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`. `TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`. `BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block. `BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk. `DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time. `BlockInfoManager#blockInfoWrappers` block info and lock not removed. The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast. Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever. ``` 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX. 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast ``` ``` "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)     at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)     at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)     at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)     at scala.collection.Iterator.foreach(Iterator.scala:943)     at scala.collection.Iterator.foreach$(Iterator.scala:943)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)     at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)     at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)     at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)     at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)     at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)     at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)     at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)     at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Throw an exception before `Files.createDirectory` to simulate disk problems. DiskBlockManager#getFile ```java if (filename.contains("piece")) { throw new java.io.IOException("disk issue") } Files.createDirectory(path) ``` ``` ./bin/spark-shell ``` ```scala spark.sql("select 1").collect() ``` ``` 22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue. 22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue java.io.IOException: disk issue at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109) at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160) at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929) ``` Closes #38467 from cxzl25/SPARK-40987. Authored-by: sychen <[email protected]> Signed-off-by: Mridul <mridul<at>gmail.com> (cherry picked from commit bbab0af) Signed-off-by: Mridul <mridulatgmail.com>
1 parent 1945045 commit b95a771

File tree

1 file changed

+26
-17
lines changed

1 file changed

+26
-17
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,23 +1952,32 @@ private[spark] class BlockManager(
19521952
* lock on the block.
19531953
*/
19541954
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
1955-
val blockStatus = if (tellMaster) {
1956-
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
1957-
Some(getCurrentBlockStatus(blockId, blockInfo))
1958-
} else None
1959-
1960-
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
1961-
val removedFromMemory = memoryStore.remove(blockId)
1962-
val removedFromDisk = diskStore.remove(blockId)
1963-
if (!removedFromMemory && !removedFromDisk) {
1964-
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
1965-
}
1966-
1967-
blockInfoManager.removeBlock(blockId)
1968-
if (tellMaster) {
1969-
// Only update storage level from the captured block status before deleting, so that
1970-
// memory size and disk size are being kept for calculating delta.
1971-
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
1955+
var hasRemoveBlock = false
1956+
try {
1957+
val blockStatus = if (tellMaster) {
1958+
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
1959+
Some(getCurrentBlockStatus(blockId, blockInfo))
1960+
} else None
1961+
1962+
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
1963+
val removedFromMemory = memoryStore.remove(blockId)
1964+
val removedFromDisk = diskStore.remove(blockId)
1965+
if (!removedFromMemory && !removedFromDisk) {
1966+
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
1967+
}
1968+
1969+
blockInfoManager.removeBlock(blockId)
1970+
hasRemoveBlock = true
1971+
if (tellMaster) {
1972+
// Only update storage level from the captured block status before deleting, so that
1973+
// memory size and disk size are being kept for calculating delta.
1974+
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
1975+
}
1976+
} finally {
1977+
if (!hasRemoveBlock) {
1978+
logWarning(s"Block $blockId was not removed normally.")
1979+
blockInfoManager.removeBlock(blockId)
1980+
}
19721981
}
19731982
}
19741983

0 commit comments

Comments
 (0)