-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13210] [SQL] catch OOM when allocate memory and expand array #11095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,11 @@ public class TaskMemoryManager { | |
@GuardedBy("this") | ||
private final HashSet<MemoryConsumer> consumers; | ||
|
||
/** | ||
* The amount of memory that is acquired but not used. | ||
*/ | ||
private long acquiredButNotUsed = 0L; | ||
|
||
/** | ||
* Construct a new TaskMemoryManager. | ||
*/ | ||
|
@@ -256,7 +261,20 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { | |
} | ||
allocatedPages.set(pageNumber); | ||
} | ||
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired); | ||
MemoryBlock page = null; | ||
try { | ||
page = memoryManager.tungstenMemoryAllocator().allocate(acquired); | ||
} catch (OutOfMemoryError e) { | ||
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired); | ||
// there is no enough memory actually, it means the actual free memory is smaller than | ||
// MemoryManager thought, we should keep the acquired memory. | ||
acquiredButNotUsed += acquired; | ||
synchronized (this) { | ||
allocatedPages.clear(pageNumber); | ||
} | ||
// this could trigger spilling to free some pages. | ||
return allocatePage(size, consumer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just now saw this commit but: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are continue hold some memory, the amount of free memory should become smaller and smaller, it will fail to acquire soon. yes, it's better to move |
||
} | ||
page.pageNumber = pageNumber; | ||
pageTable[pageNumber] = page; | ||
if (logger.isTraceEnabled()) { | ||
|
@@ -378,6 +396,9 @@ public long cleanUpAllAllocatedMemory() { | |
} | ||
Arrays.fill(pageTable, null); | ||
|
||
// release the memory that is not used by any consumer. | ||
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode); | ||
|
||
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,9 @@ public void testBasicSorting() throws Exception { | |
// Write the records into the data page and store pointers into the sorter | ||
long position = dataPage.getBaseOffset(); | ||
for (String str : dataToSort) { | ||
if (!sorter.hasSpaceForAnotherRecord()) { | ||
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should only be |
||
} | ||
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position); | ||
final byte[] strBytes = str.getBytes("utf-8"); | ||
Platform.putInt(baseObject, position, strBytes.length); | ||
|
@@ -114,6 +117,9 @@ public void testSortingManyNumbers() throws Exception { | |
int[] numbersToSort = new int[128000]; | ||
Random random = new Random(16); | ||
for (int i = 0; i < numbersToSort.length; i++) { | ||
if (!sorter.hasSpaceForAnotherRecord()) { | ||
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2)); | ||
} | ||
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1); | ||
sorter.insertRecord(0, numbersToSort[i]); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should log something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INFO or WARN ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do WARN