Skip to content

Commit 05e441e

Browse files
JoshRosenAndrew Or
authored andcommitted
[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution
This patch fixes a bug in the eviction of storage memory by execution. ## The bug: In general, execution should be able to evict storage memory when the total storage memory usage is greater than `maxMemory * spark.memory.storageFraction`. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For example, here is a regression test which illustrates the bug: ```scala val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes // of storage memory which are immune to eviction by execution memory pressure. // Acquire enough storage memory to exceed the storage region size assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) // At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution // should be able to reclaim up to 250 bytes of storage memory. // Therefore, execution should now be able to require up to 500 bytes of memory: assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) assertEvictBlocksToFreeSpaceCalled(ms, 250L) ``` The problem relates to the control flow / interaction between `StorageMemoryPool.shrinkPoolToReclaimSpace()` and `MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of execution memory, the `UnifiedMemoryManager` discovers that it will need to reclaim 250 bytes of memory from storage, so it calls `StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls `MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks whether the requested space is less than `maxStorageMemory - storageMemoryUsed`, which will be true if there is any free execution memory because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used. The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls `MemoryManager.freeStorageMemory`. ## The solution: The solution implemented in this patch is to remove the confusing circular control flow between `MemoryManager` and `MemoryStore`, making the storage memory acquisition process much more linear / straightforward. The key changes: - Remove a layer of inheritance which made the memory manager code harder to understand (5384117). - Move some bounds checks earlier in the call chain (13ba7ad). - Refactor `ensureFreeSpace()` so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca0). - Realize that this lets us remove a layer of overloads from `ensureFreeSpace` (eec4f6c). - Realize that `ensureFreeSpace()` can simply be replaced with an `evictBlocksToFreeSpace()` method which is called [after we've already figured out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88) how much memory needs to be reclaimed via eviction; (2dc842a). Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: the old mocks would [unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84) report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided. I also fixed a problem where `StorageMemoryPool._memoryUsed` might become negative due to freed memory being double-counted when excution evicts storage. The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement `_memoryUsed`](7c68ca0#diff-935c68a9803be144ed7bafdd2f756a0fL133) even though `StorageMemoryPool.freeMemory` had already decremented it as each evicted block was freed. See SPARK-12189 for details. Author: Josh Rosen <[email protected]> Author: Andrew Or <[email protected]> Closes #10170 from JoshRosen/SPARK-12165. (cherry picked from commit aec5ea0) Signed-off-by: Andrew Or <[email protected]>
1 parent acd4624 commit 05e441e

File tree

8 files changed

+230
-204
lines changed

8 files changed

+230
-204
lines changed

core/src/main/scala/org/apache/spark/memory/MemoryManager.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager(
7777
def acquireStorageMemory(
7878
blockId: BlockId,
7979
numBytes: Long,
80-
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
81-
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
82-
}
80+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
8381

8482
/**
8583
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager(
109107
def acquireExecutionMemory(
110108
numBytes: Long,
111109
taskAttemptId: Long,
112-
memoryMode: MemoryMode): Long = synchronized {
113-
memoryMode match {
114-
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
115-
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
116-
}
117-
}
110+
memoryMode: MemoryMode): Long
118111

119112
/**
120113
* Release numBytes of execution memory belonging to the given task.

core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,50 @@ private[spark] class StaticMemoryManager(
4949
}
5050

5151
// Max number of bytes worth of blocks to evict when unrolling
52-
private val maxMemoryToEvictForUnroll: Long = {
52+
private val maxUnrollMemory: Long = {
5353
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
5454
}
5555

56+
override def acquireStorageMemory(
57+
blockId: BlockId,
58+
numBytes: Long,
59+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
60+
if (numBytes > maxStorageMemory) {
61+
// Fail fast if the block simply won't fit
62+
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
63+
s"memory limit ($maxStorageMemory bytes)")
64+
false
65+
} else {
66+
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
67+
}
68+
}
69+
5670
override def acquireUnrollMemory(
5771
blockId: BlockId,
5872
numBytes: Long,
5973
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
6074
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
61-
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
62-
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
75+
val freeMemory = storageMemoryPool.memoryFree
76+
// When unrolling, we will use all of the existing free memory, and, if necessary,
77+
// some extra space freed from evicting cached blocks. We must place a cap on the
78+
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
79+
// big block can blow away the entire cache.
80+
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
81+
// Keep it within the range 0 <= X <= maxNumBytesToFree
82+
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
6383
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
6484
}
85+
86+
private[memory]
87+
override def acquireExecutionMemory(
88+
numBytes: Long,
89+
taskAttemptId: Long,
90+
memoryMode: MemoryMode): Long = synchronized {
91+
memoryMode match {
92+
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
93+
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
94+
}
95+
}
6596
}
6697

6798

core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,16 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
6565
blockId: BlockId,
6666
numBytes: Long,
6767
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
68-
acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
68+
val numBytesToFree = math.max(0, numBytes - memoryFree)
69+
acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
6970
}
7071

7172
/**
7273
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
7374
*
7475
* @param blockId the ID of the block we are acquiring storage memory for
7576
* @param numBytesToAcquire the size of this block
76-
* @param numBytesToFree the size of space to be freed through evicting blocks
77+
* @param numBytesToFree the amount of space to be freed through evicting blocks
7778
* @return whether all N bytes were successfully granted.
7879
*/
7980
def acquireMemory(
@@ -84,16 +85,18 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
8485
assert(numBytesToAcquire >= 0)
8586
assert(numBytesToFree >= 0)
8687
assert(memoryUsed <= poolSize)
87-
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
88-
// Register evicted blocks, if any, with the active task metrics
89-
Option(TaskContext.get()).foreach { tc =>
90-
val metrics = tc.taskMetrics()
91-
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
92-
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
88+
if (numBytesToFree > 0) {
89+
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
90+
// Register evicted blocks, if any, with the active task metrics
91+
Option(TaskContext.get()).foreach { tc =>
92+
val metrics = tc.taskMetrics()
93+
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
94+
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
95+
}
9396
}
9497
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
95-
// back into this StorageMemoryPool in order to free. Therefore, these variables should have
96-
// been updated.
98+
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
99+
// should have been updated.
97100
val enoughMemory = numBytesToAcquire <= memoryFree
98101
if (enoughMemory) {
99102
_memoryUsed += numBytesToAcquire
@@ -121,18 +124,20 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
121124
*/
122125
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
123126
// First, shrink the pool by reclaiming free memory:
124-
val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
127+
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
125128
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
126-
if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
127-
spaceFreedByReleasingUnusedMemory
128-
} else {
129+
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
130+
if (remainingSpaceToFree > 0) {
129131
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
130132
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
131-
memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
133+
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
132134
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
133-
_memoryUsed -= spaceFreedByEviction
135+
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
136+
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
134137
decrementPoolSize(spaceFreedByEviction)
135138
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
139+
} else {
140+
spaceFreedByReleasingUnusedMemory
136141
}
137142
}
138143
}

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
100100
case MemoryMode.OFF_HEAP =>
101101
// For now, we only support on-heap caching of data, so we do not need to interact with
102102
// the storage pool when allocating off-heap memory. This will change in the future, though.
103-
super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
103+
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
104104
}
105105
}
106106

@@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
110110
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
111111
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
112112
assert(numBytes >= 0)
113+
if (numBytes > maxStorageMemory) {
114+
// Fail fast if the block simply won't fit
115+
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
116+
s"memory limit ($maxStorageMemory bytes)")
117+
return false
118+
}
113119
if (numBytes > storageMemoryPool.memoryFree) {
114120
// There is not enough free memory in the storage pool, so try to borrow free memory from
115121
// the execution pool.

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 16 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -406,85 +406,41 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
406406
}
407407

408408
/**
409-
* Try to free up a given amount of space by evicting existing blocks.
410-
*
411-
* @param space the amount of memory to free, in bytes
412-
* @param droppedBlocks a holder for blocks evicted in the process
413-
* @return whether the requested free space is freed.
414-
*/
415-
private[spark] def ensureFreeSpace(
416-
space: Long,
417-
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
418-
ensureFreeSpace(None, space, droppedBlocks)
419-
}
420-
421-
/**
422-
* Try to free up a given amount of space to store a block by evicting existing ones.
423-
*
424-
* @param space the amount of memory to free, in bytes
425-
* @param droppedBlocks a holder for blocks evicted in the process
426-
* @return whether the requested free space is freed.
427-
*/
428-
private[spark] def ensureFreeSpace(
429-
blockId: BlockId,
430-
space: Long,
431-
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
432-
ensureFreeSpace(Some(blockId), space, droppedBlocks)
433-
}
434-
435-
/**
436-
* Try to free up a given amount of space to store a particular block, but can fail if
437-
* either the block is bigger than our memory or it would require replacing another block
438-
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
439-
* don't fit into memory that we want to avoid).
440-
*
441-
* @param blockId the ID of the block we are freeing space for, if any
442-
* @param space the size of this block
443-
* @param droppedBlocks a holder for blocks evicted in the process
444-
* @return whether the requested free space is freed.
445-
*/
446-
private def ensureFreeSpace(
409+
* Try to evict blocks to free up a given amount of space to store a particular block.
410+
* Can fail if either the block is bigger than our memory or it would require replacing
411+
* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
412+
* RDDs that don't fit into memory that we want to avoid).
413+
*
414+
* @param blockId the ID of the block we are freeing space for, if any
415+
* @param space the size of this block
416+
* @param droppedBlocks a holder for blocks evicted in the process
417+
* @return whether the requested free space is freed.
418+
*/
419+
private[spark] def evictBlocksToFreeSpace(
447420
blockId: Option[BlockId],
448421
space: Long,
449422
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
423+
assert(space > 0)
450424
memoryManager.synchronized {
451-
val freeMemory = maxMemory - memoryUsed
425+
var freedMemory = 0L
452426
val rddToAdd = blockId.flatMap(getRddId)
453427
val selectedBlocks = new ArrayBuffer[BlockId]
454-
var selectedMemory = 0L
455-
456-
logInfo(s"Ensuring $space bytes of free space " +
457-
blockId.map { id => s"for block $id" }.getOrElse("") +
458-
s"(free: $freeMemory, max: $maxMemory)")
459-
460-
// Fail fast if the block simply won't fit
461-
if (space > maxMemory) {
462-
logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
463-
s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
464-
return false
465-
}
466-
467-
// No need to evict anything if there is already enough free space
468-
if (freeMemory >= space) {
469-
return true
470-
}
471-
472428
// This is synchronized to ensure that the set of entries is not changed
473429
// (because of getValue or getBytes) while traversing the iterator, as that
474430
// can lead to exceptions.
475431
entries.synchronized {
476432
val iterator = entries.entrySet().iterator()
477-
while (freeMemory + selectedMemory < space && iterator.hasNext) {
433+
while (freedMemory < space && iterator.hasNext) {
478434
val pair = iterator.next()
479435
val blockId = pair.getKey
480436
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
481437
selectedBlocks += blockId
482-
selectedMemory += pair.getValue.size
438+
freedMemory += pair.getValue.size
483439
}
484440
}
485441
}
486442

487-
if (freeMemory + selectedMemory >= space) {
443+
if (freedMemory >= space) {
488444
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
489445
for (blockId <- selectedBlocks) {
490446
val entry = entries.synchronized { entries.get(blockId) }

0 commit comments

Comments
 (0)