Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class TaskMemoryManager {
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;

/**
* The amount of memory that is acquired but not used.
*/
private long acquiredButNotUsed = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -256,7 +261,20 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO or WARN ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do WARN

// MemoryManager thought, we should keep the acquired memory.
acquiredButNotUsed += acquired;
synchronized (this) {
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just now saw this commit but:
Is this tail recursion a problem? if you keep running out of memory it keeps calling itself.
acquiredButNotUsed += acquired needs to be synchronized too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are continue hold some memory, the amount of free memory should become smaller and smaller, it will fail to acquire soon.

yes, it's better to move acquiredButNotUsed += acquired into the synchronized sections. If acquiredButNotUsed is not calculated correctly (because risk conditions), you will only saw an warning message at the end of a task.

}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -378,6 +396,9 @@ public long cleanUpAllAllocatedMemory() {
}
Arrays.fill(pageTable, null);

// release the memory that is not used by any consumer.
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);

return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public long getMemoryUsage() {
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void expandPointerArray(LongArray newArray) {
*/
public void insertRecord(long recordPointer, long keyPrefix) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, recordPointer);
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void testBasicSorting() throws Exception {
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only be * 2 because the shuffle sorter only uses one array entry per record instead of the pair of entires which is used by the more general prefix sorter. I can fix this up myself on merge.

}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes("utf-8");
Platform.putInt(baseObject, position, strBytes.length);
Expand Down Expand Up @@ -114,6 +117,9 @@ public void testSortingManyNumbers() throws Exception {
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public int compare(long prefix1, long prefix2) {
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);
Expand Down