Skip to content

Commit 8958584

Browse files
committed
Fix bug in calculating free space in current page.
This broke off-heap mode.
1 parent f17fa8f commit 8958584

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class UnsafeShuffleWriter<K, V> implements ShuffleWriter<K, V> {
7171
private MapStatus mapStatus = null;
7272

7373
private MemoryBlock currentPage = null;
74-
private long currentPagePosition = PAGE_SIZE;
74+
private long currentPagePosition = -1;
7575

7676
/**
7777
* Are we in the process of stopping? Because map tasks can call stop() with success = true
@@ -110,11 +110,17 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) {
110110
}
111111

112112
private void ensureSpaceInDataPage(long requiredSpace) throws Exception {
113+
final long spaceInCurrentPage;
114+
if (currentPage != null) {
115+
spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage.getBaseOffset());
116+
} else {
117+
spaceInCurrentPage = 0;
118+
}
113119
if (requiredSpace > PAGE_SIZE) {
114120
// TODO: throw a more specific exception?
115121
throw new Exception("Required space " + requiredSpace + " is greater than page size (" +
116122
PAGE_SIZE + ")");
117-
} else if (requiredSpace > (PAGE_SIZE - currentPagePosition)) {
123+
} else if (requiredSpace > spaceInCurrentPage) {
118124
currentPage = memoryManager.allocatePage(PAGE_SIZE);
119125
currentPagePosition = currentPage.getBaseOffset();
120126
allocatedPages.add(currentPage);

0 commit comments

Comments
 (0)