Skip to content

Commit 622d116

Browse files
committed
Merge branch 'ESPARK-100' into 'spark_2.1'
[ESPARK-100] 加上shuffle限流功能,单个task shuffle超过10G,就直接报错挂掉 加上shuffle限流功能,单个task shuffle超过10G,就直接报错挂掉 resolve apache#100 See merge request !70
2 parents f54c331 + 3c01848 commit 622d116

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
7272
private final BlockManager blockManager;
7373
private final TaskContext taskContext;
7474
private final ShuffleWriteMetrics writeMetrics;
75+
private long totalSpillBytes = 0L;
76+
private long shuffleSpillThreshold = 0L;
7577

7678
/**
7779
* Force this sorter to spill when there are this many elements in memory. The default value is
@@ -117,6 +119,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
117119
this.numPartitions = numPartitions;
118120
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
119121
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
122+
this.shuffleSpillThreshold = conf.getSizeAsBytes("spark.shuffle.spill.limit", "10g");
120123
this.numElementsForSpillThreshold =
121124
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
122125
this.writeMetrics = writeMetrics;
@@ -245,6 +248,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
245248
return 0L;
246249
}
247250

251+
if (totalSpillBytes > shuffleSpillThreshold) {
252+
throw new IOException("Shuffle spill exceed " + shuffleSpillThreshold);
253+
}
254+
248255
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
249256
Thread.currentThread().getId(),
250257
Utils.bytesToString(getMemoryUsage()),
@@ -258,6 +265,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
258265
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
259266
// pages, we might not be able to get memory for the pointer array.
260267
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
268+
totalSpillBytes += spillSize;
261269
return spillSize;
262270
}
263271

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
8686
private long totalSpillBytes = 0L;
8787
private long totalSortTimeNanos = 0L;
8888
private volatile SpillableIterator readingIterator = null;
89+
private long shuffleSpillThreshold = 0L;
8990

9091
public static UnsafeExternalSorter createWithExistingInMemorySorter(
9192
TaskMemoryManager taskMemoryManager,
@@ -150,6 +151,9 @@ private UnsafeExternalSorter(
150151
// TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577).
151152
this.writeMetrics = new ShuffleWriteMetrics();
152153

154+
this.shuffleSpillThreshold = blockManager.conf()
155+
.getSizeAsBytes("spark.shuffle.spill.limit", "10g");
156+
153157
if (existingInMemorySorter == null) {
154158
this.inMemSorter = new UnsafeInMemorySorter(
155159
this, taskMemoryManager, recordComparator, prefixComparator, initialSize, canUseRadixSort);
@@ -188,6 +192,9 @@ public void closeCurrentPage() {
188192
*/
189193
@Override
190194
public long spill(long size, MemoryConsumer trigger) throws IOException {
195+
if (totalSpillBytes > shuffleSpillThreshold) {
196+
throw new IOException("Shuffle spill exceed " + shuffleSpillThreshold);
197+
}
191198
if (trigger != this) {
192199
if (readingIterator != null) {
193200
return readingIterator.spill();

0 commit comments

Comments
 (0)