Skip to content

Commit 37bc203

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-13210][SQL] catch OOM when allocate memory and expand array
There is a bug when we try to grow the buffer, OOM is ignore wrongly (the assert also skipped by JVM), then we try grow the array again, this one will trigger spilling free the current page, the current record we inserted will be invalid. The root cause is that JVM has less free memory than MemoryManager thought, it will OOM when allocate a page without trigger spilling. We should catch the OOM, and acquire memory again to trigger spilling. And also, we could not grow the array in `insertRecord` of `InMemorySorter` (it was there just for easy testing). Author: Davies Liu <[email protected]> Closes #11095 from davies/fix_expand.
1 parent 8e4d15f commit 37bc203

File tree

7 files changed

+35
-21
lines changed

7 files changed

+35
-21
lines changed

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ public class TaskMemoryManager {
111111
@GuardedBy("this")
112112
private final HashSet<MemoryConsumer> consumers;
113113

114+
/**
115+
* The amount of memory that is acquired but not used.
116+
*/
117+
private long acquiredButNotUsed = 0L;
118+
114119
/**
115120
* Construct a new TaskMemoryManager.
116121
*/
@@ -256,7 +261,20 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
256261
}
257262
allocatedPages.set(pageNumber);
258263
}
259-
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
264+
MemoryBlock page = null;
265+
try {
266+
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
267+
} catch (OutOfMemoryError e) {
268+
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
269+
// there is no enough memory actually, it means the actual free memory is smaller than
270+
// MemoryManager thought, we should keep the acquired memory.
271+
acquiredButNotUsed += acquired;
272+
synchronized (this) {
273+
allocatedPages.clear(pageNumber);
274+
}
275+
// this could trigger spilling to free some pages.
276+
return allocatePage(size, consumer);
277+
}
260278
page.pageNumber = pageNumber;
261279
pageTable[pageNumber] = page;
262280
if (logger.isTraceEnabled()) {
@@ -378,6 +396,9 @@ public long cleanUpAllAllocatedMemory() {
378396
}
379397
Arrays.fill(pageTable, null);
380398

399+
// release the memory that is not used by any consumer.
400+
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
401+
381402
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
382403
}
383404

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -320,15 +320,7 @@ private void growPointerArrayIfNecessary() throws IOException {
320320
assert(inMemSorter != null);
321321
if (!inMemSorter.hasSpaceForAnotherRecord()) {
322322
long used = inMemSorter.getMemoryUsage();
323-
LongArray array;
324-
try {
325-
// could trigger spilling
326-
array = allocateArray(used / 8 * 2);
327-
} catch (OutOfMemoryError e) {
328-
// should have trigger spilling
329-
assert(inMemSorter.hasSpaceForAnotherRecord());
330-
return;
331-
}
323+
LongArray array = allocateArray(used / 8 * 2);
332324
// check if spilling is triggered or not
333325
if (inMemSorter.hasSpaceForAnotherRecord()) {
334326
freeArray(array);

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public long getMemoryUsage() {
104104
*/
105105
public void insertRecord(long recordPointer, int partitionId) {
106106
if (!hasSpaceForAnotherRecord()) {
107-
expandPointerArray(consumer.allocateArray(array.size() * 2));
107+
throw new IllegalStateException("There is no space for new record");
108108
}
109109
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
110110
pos++;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -293,15 +293,7 @@ private void growPointerArrayIfNecessary() throws IOException {
293293
assert(inMemSorter != null);
294294
if (!inMemSorter.hasSpaceForAnotherRecord()) {
295295
long used = inMemSorter.getMemoryUsage();
296-
LongArray array;
297-
try {
298-
// could trigger spilling
299-
array = allocateArray(used / 8 * 2);
300-
} catch (OutOfMemoryError e) {
301-
// should have trigger spilling
302-
assert(inMemSorter.hasSpaceForAnotherRecord());
303-
return;
304-
}
296+
LongArray array = allocateArray(used / 8 * 2);
305297
// check if spilling is triggered or not
306298
if (inMemSorter.hasSpaceForAnotherRecord()) {
307299
freeArray(array);

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void expandPointerArray(LongArray newArray) {
164164
*/
165165
public void insertRecord(long recordPointer, long keyPrefix) {
166166
if (!hasSpaceForAnotherRecord()) {
167-
expandPointerArray(consumer.allocateArray(array.size() * 2));
167+
throw new IllegalStateException("There is no space for new record");
168168
}
169169
array.set(pos, recordPointer);
170170
pos++;

core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public void testBasicSorting() throws Exception {
7575
// Write the records into the data page and store pointers into the sorter
7676
long position = dataPage.getBaseOffset();
7777
for (String str : dataToSort) {
78+
if (!sorter.hasSpaceForAnotherRecord()) {
79+
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
80+
}
7881
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
7982
final byte[] strBytes = str.getBytes("utf-8");
8083
Platform.putInt(baseObject, position, strBytes.length);
@@ -114,6 +117,9 @@ public void testSortingManyNumbers() throws Exception {
114117
int[] numbersToSort = new int[128000];
115118
Random random = new Random(16);
116119
for (int i = 0; i < numbersToSort.length; i++) {
120+
if (!sorter.hasSpaceForAnotherRecord()) {
121+
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
122+
}
117123
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
118124
sorter.insertRecord(0, numbersToSort[i]);
119125
}

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ public int compare(long prefix1, long prefix2) {
111111
// Given a page of records, insert those records into the sorter one-by-one:
112112
position = dataPage.getBaseOffset();
113113
for (int i = 0; i < dataToSort.length; i++) {
114+
if (!sorter.hasSpaceForAnotherRecord()) {
115+
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
116+
}
114117
// position now points to the start of a record (which holds its length).
115118
final int recordLength = Platform.getInt(baseObject, position);
116119
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);

0 commit comments

Comments
 (0)