Skip to content

Commit 57312c9

Browse files
committed
Clarify fileBufferSize units
1 parent 2d4e4f4 commit 57312c9

File tree

4 files changed

+10
-8
lines changed

4 files changed

+10
-8
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ final class UnsafeShuffleExternalSorter {
7474
private final ShuffleWriteMetrics writeMetrics;
7575

7676
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
77-
private final int fileBufferSize;
77+
private final int fileBufferSizeBytes;
7878

7979
/**
8080
* Memory pages that hold the records being sorted. The pages in this list are freed when
@@ -108,8 +108,9 @@ public UnsafeShuffleExternalSorter(
108108
this.initialSize = initialSize;
109109
this.numPartitions = numPartitions;
110110
this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
111-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
112-
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
111+
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
112+
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
113+
113114
this.writeMetrics = writeMetrics;
114115
initializeForWriting();
115116
}
@@ -182,7 +183,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
182183
// around this, we pass a dummy no-op serializer.
183184
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
184185

185-
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);
186+
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
186187

187188
int currentPartition = -1;
188189
while (sortedRecords.hasNext()) {
@@ -196,7 +197,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
196197
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
197198
}
198199
currentPartition = partition;
199-
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);
200+
writer =
201+
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
200202
}
201203

202204
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
7676
private val consolidateShuffleFiles =
7777
conf.getBoolean("spark.shuffle.consolidateFiles", false)
7878

79-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
79+
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
8080
private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
8181

8282
/**

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class ExternalAppendOnlyMap[K, V, C](
9090
// Number of bytes spilled in total
9191
private var _diskBytesSpilled = 0L
9292

93-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
93+
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
9494
private val fileBufferSize =
9595
sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
9696

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private[spark] class ExternalSorter[K, V, C](
110110
private val conf = SparkEnv.get.conf
111111
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
112112

113-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
113+
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
114114
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
115115
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
116116

0 commit comments

Comments
 (0)