Skip to content

Commit df07699

Browse files
committed
Attempt to clarify confusing metrics update code
1 parent 5e189c6 commit df07699

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
214214
recordReadPosition += toTransfer;
215215
dataRemaining -= toTransfer;
216216
}
217-
// TODO: add a test that detects whether we leave this call out:
218217
writer.recordWritten();
219218
}
220219

@@ -229,11 +228,23 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
229228
}
230229
}
231230

232-
if (!isLastFile) {
233-
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
231+
if (!isLastFile) { // i.e. this is a spill file
232+
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
233+
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
234+
// relies on its `recordWritten()` method being called in order to trigger periodic updates to
235+
// `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
236+
// counter at a higher-level, then the in-progress metrics for records written and bytes
237+
// written would get out of sync.
238+
//
239+
// When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
240+
// in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
241+
// metrics to the true write metrics here. The reason for performing this copying is so that
242+
// we can avoid reporting spilled bytes as shuffle write bytes.
243+
//
244+
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
234245
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
235246
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
236-
// writeMetrics.incShuffleWriteTime(writeMetricsToUse.shuffleWriteTime());
247+
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
237248
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
238249
}
239250
}

0 commit comments

Comments
 (0)