Skip to content

Commit 8531286

Browse files
committed
Add tests that automatically trigger spills.
This bumps up line coverage to 93% in UnsafeShuffleExternalSorter; now, the only branches that are missed are exception-handling code.
1 parent 7c953f9 commit 8531286

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
5959

6060
private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
6161

62+
@VisibleForTesting
63+
static final int INITIAL_SORT_BUFFER_SIZE = 4096;
64+
6265
private final BlockManager blockManager;
6366
private final IndexShuffleBlockResolver shuffleBlockResolver;
6467
private final TaskMemoryManager memoryManager;
@@ -152,7 +155,7 @@ private void open() throws IOException {
152155
shuffleMemoryManager,
153156
blockManager,
154157
taskContext,
155-
4096, // Initial size (TODO: tune this!)
158+
INITIAL_SORT_BUFFER_SIZE,
156159
partitioner.numPartitions(),
157160
sparkConf);
158161
serBuffer = new MyByteArrayOutputStream(1024 * 1024);

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,47 @@ public void mergeSpillsWithFileStreamAndNoCompression() throws Exception {
363363
testMergingSpills(false, null);
364364
}
365365

366+
@Test
367+
public void writeEnoughDataToTriggerSpill() throws Exception {
368+
when(shuffleMemoryManager.tryToAcquire(anyLong()))
369+
.then(returnsFirstArg()) // Allocate initial sort buffer
370+
.then(returnsFirstArg()) // Allocate initial data page
371+
.thenReturn(0L) // Deny request to allocate new data page
372+
.then(returnsFirstArg()); // Grant new sort buffer and data page.
373+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
374+
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<Product2<Object, Object>>();
375+
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 128];
376+
for (int i = 0; i < 128 + 1; i++) {
377+
dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
378+
}
379+
writer.write(dataToWrite.iterator());
380+
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
381+
Assert.assertEquals(2, spillFilesCreated.size());
382+
writer.stop(true);
383+
readRecordsFromFile();
384+
assertSpillFilesWereCleanedUp();
385+
}
386+
387+
@Test
388+
public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
389+
when(shuffleMemoryManager.tryToAcquire(anyLong()))
390+
.then(returnsFirstArg()) // Allocate initial sort buffer
391+
.then(returnsFirstArg()) // Allocate initial data page
392+
.thenReturn(0L) // Deny request to grow sort buffer
393+
.then(returnsFirstArg()); // Grant new sort buffer and data page.
394+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
395+
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<Product2<Object, Object>>();
396+
for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
397+
dataToWrite.add(new Tuple2<Object, Object>(i, i));
398+
}
399+
writer.write(dataToWrite.iterator());
400+
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
401+
Assert.assertEquals(2, spillFilesCreated.size());
402+
writer.stop(true);
403+
readRecordsFromFile();
404+
assertSpillFilesWereCleanedUp();
405+
}
406+
366407
@Test
367408
public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception {
368409
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);

0 commit comments

Comments
 (0)