Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] object CompressionCodec {
}

val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
* The given write metrics will be updated incrementally, but will not necessarily be current until
* commitAndClose is called.
*/
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
Expand All @@ -75,6 +73,8 @@ private[spark] class DiskBlockObjectWriter(
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
// These write metrics concurrently shared with other active BlockObjectWriter's who
// are themselves performing writes. All updates must be relative.
writeMetrics: ShuffleWriteMetrics)
extends BlockObjectWriter(blockId)
with Logging
Expand All @@ -94,14 +94,30 @@ private[spark] class DiskBlockObjectWriter(
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialized = false

/**
* Cursors used to represent positions in the file.
*
* xxxxxxxx|--------|--- |
* ^ ^ ^
* | | finalPosition
* | reportedPosition
* initialPosition
*
* initialPosition: Offset in the file where we start writing. Immutable.
* reportedPosition: Position at the time of the last update to the write metrics.
* finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
* -----: Current writes to the underlying file.
* xxxxx: Existing contents of the file.
*/
private val initialPosition = file.length()
private var finalPosition: Long = -1
private var initialized = false
private var reportedPosition = initialPosition

/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
* only call it every N writes */
private var writesSinceMetricsUpdate = 0
private var lastPosition = initialPosition

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
Expand Down Expand Up @@ -140,17 +156,18 @@ private[spark] class DiskBlockObjectWriter(
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
updateBytesWritten()
close()
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)
writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)

if (initialized) {
objOut.flush()
Expand Down Expand Up @@ -189,10 +206,14 @@ private[spark] class DiskBlockObjectWriter(
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}

/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.shuffleBytesWritten += (pos - lastPosition)
lastPosition = pos
writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
reportedPosition = pos
}

private def callWithTiming(f: => Unit) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,12 @@ class ExternalAppendOnlyMap[K, V, C](
extends Iterator[(K, C)]
{
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
assert(file.length() == batchOffsets(batchOffsets.length - 1))
assert(file.length() == batchOffsets.last,
"File length is not equal to the last batch offset:\n" +
s" file length = ${file.length}\n" +
s" last batch offset = ${batchOffsets.last}\n" +
s" all batch offsets = ${batchOffsets.mkString(",")}"
)

private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private var testDir: File = _

before {
Expand Down
Loading