Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
Expand Down Expand Up @@ -202,7 +202,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
Expand Down Expand Up @@ -410,7 +410,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {

/**
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
@volatile private var _bytesWritten: Long = _
def bytesWritten: Long = _bytesWritten
private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
@volatile private var _writeTime: Long = _
def writeTime: Long = _writeTime
private[spark] def incWriteTime(value: Long) = _writeTime += value
private[spark] def decWriteTime(value: Long) = _writeTime -= value

/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
@volatile private var _recordsWritten: Long = _
def recordsWritten: Long = _recordsWritten
private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
def shuffleBytesWritten: Long = bytesWritten
@deprecated("use writeTime instead", "2.0.0")
def shuffleWriteTime: Long = writeTime
@deprecated("use recordsWritten instead", "2.0.0")
def shuffleRecordsWritten: Long = recordsWritten

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {

// Shuffle write
showBytesDistribution("shuffle bytes written:",
(_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
(_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)

// Fetch & I/O
showMillisDistribution("fetch wait time:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
writeMetrics.incWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
shuffleState.completedMapTasks.add(mapId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C](
if (sorter != null) {
val startTime = System.nanoTime()
sorter.stop()
writeMetrics.incShuffleWriteTime(System.nanoTime - startTime)
writeMetrics.incWriteTime(System.nanoTime - startTime)
sorter = null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
raw.shuffleWriteMetrics
}
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.shuffleBytesWritten),
writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
writeTime = submetricQuantiles(_.shuffleWriteTime)
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption

Expand Down Expand Up @@ -283,9 +283,9 @@ private[v1] object AllStagesResource {

def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
bytesWritten = internal.shuffleBytesWritten,
writeTime = internal.shuffleWriteTime,
recordsWritten = internal.shuffleRecordsWritten
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
writeMetrics.incWriteTime(System.nanoTime() - start)
}
} {
objOut.close()
Expand Down Expand Up @@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
writeMetrics.incBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
Expand All @@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
// truncating the file to its initial position.
try {
if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
writeMetrics.decBytesWritten(reportedPosition - initialPosition)
writeMetrics.decRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down Expand Up @@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)
writeMetrics.incRecordsWritten(1)

if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
Expand All @@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)

val shuffleWriteDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta

val shuffleWriteRecordsDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantiles(shuffleReadRemoteSizes)

val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}

val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}

val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
Expand Down Expand Up @@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
val shuffleWriteTime =
(metricsOpt.flatMap(_.shuffleWriteMetrics
.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong
.map(_.writeTime)).getOrElse(0L) / 1e6).toLong
val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)

val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
Expand Down Expand Up @@ -930,13 +930,13 @@ private[ui] class TaskDataSource(
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")

val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
val shuffleWriteReadable = maybeShuffleWrite
.map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
.map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
val shuffleWriteRecords = maybeShuffleWrite
.map(_.shuffleRecordsWritten.toString).getOrElse("")
.map(_.recordsWritten.toString).getOrElse("")

val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
val writeTimeSortable = maybeWriteTime.getOrElse(0L)
val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
if (ms == 0) "" else UIUtils.formatDuration(ms)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,10 @@ private[spark] object JsonProtocol {
// TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
val writeMetrics = metrics.registerShuffleWriteMetrics()
writeMetrics.incShuffleBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incShuffleRecordsWritten((writeJson \ "Shuffle Records Written")
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0L))
writeMetrics.incShuffleWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
}

// Output metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
batchSizes.append(curWriteMetrics.shuffleBytesWritten)
_diskBytesSpilled += curWriteMetrics.bytesWritten
batchSizes.append(curWriteMetrics.bytesWritten)
objectsWritten = 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += spillMetrics.shuffleBytesWritten
batchSizes.append(spillMetrics.shuffleBytesWritten)
_diskBytesSpilled += spillMetrics.bytesWritten
batchSizes.append(spillMetrics.bytesWritten)
spillMetrics = null
objectsWritten = 0
}
Expand Down
Loading