Skip to content

Commit 9514dc8

Browse files
committed
Added unit tests to test reading of corrupted data and other minor edits
1 parent 3881706 commit 9514dc8

File tree

5 files changed

+158
-109
lines changed

5 files changed

+158
-109
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala renamed to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
*/
1717
package org.apache.spark.streaming.util
1818

19-
private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
19+
/** Class for representing a segment of data in a write ahead log file */
20+
private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ private[streaming] class WriteAheadLogManager(
7575
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
7676
* to HDFS, and will be available for readers to read.
7777
*/
78-
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
79-
var fileSegment: FileSegment = null
78+
def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
79+
var fileSegment: WriteAheadLogFileSegment = null
8080
var failures = 0
8181
var lastException: Exception = null
8282
var succeeded = false
@@ -112,8 +112,8 @@ private[streaming] class WriteAheadLogManager(
112112
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
113113
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
114114
logFilesToRead.iterator.map { file =>
115-
logDebug(s"Creating log reader with $file")
116-
new WriteAheadLogReader(file, hadoopConf)
115+
logDebug(s"Creating log reader with $file")
116+
new WriteAheadLogReader(file, hadoopConf)
117117
} flatMap { x => x }
118118
}
119119

@@ -208,6 +208,7 @@ private[util] object WriteAheadLogManager {
208208
s"log-$startTime-$stopTime"
209209
}
210210

211+
/** Convert a sequence of files to a sequence of sorted LogInfo objects */
211212
def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
212213
files.flatMap { file =>
213214
logFileRegex.findFirstIn(file.getName()) match {

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura
3232
private val instream = HdfsUtils.getInputStream(path, conf)
3333
private var closed = false
3434

35-
def read(segment: FileSegment): ByteBuffer = synchronized {
35+
def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
3636
assertOpen()
3737
instream.seek(segment.offset)
3838
val nextLength = instream.readInt()
3939
HdfsUtils.checkState(nextLength == segment.length,
40-
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
40+
s"Expected message length to be ${segment.length}, but was $nextLength")
4141
val buffer = new Array[Byte](nextLength)
4242
instream.readFully(buffer)
4343
ByteBuffer.wrap(buffer)

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,49 +34,46 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
3434
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
3535

3636
private lazy val hadoopFlushMethod = {
37+
// Use reflection to get the right flush operation
3738
val cls = classOf[FSDataOutputStream]
3839
Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
3940
}
4041

41-
private var nextOffset = getPosition()
42+
private var nextOffset = stream.getPos()
4243
private var closed = false
4344

44-
4545
/** Write the bytebuffer to the log file */
46-
def write(data: ByteBuffer): FileSegment = synchronized {
46+
def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
4747
assertOpen()
4848
data.rewind() // Rewind to ensure all data in the buffer is retrieved
4949
val lengthToWrite = data.remaining()
50-
val segment = new FileSegment(path, nextOffset, lengthToWrite)
50+
val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
5151
stream.writeInt(lengthToWrite)
5252
if (data.hasArray) {
5353
stream.write(data.array())
5454
} else {
55-
// If the buffer is not backed by an array we need to write the data byte by byte
55+
// If the buffer is not backed by an array, we transfer using temp array
56+
// Note that despite the extra array copy, this should be faster than byte-by-byte copy
5657
while (data.hasRemaining) {
57-
stream.write(data.get())
58+
val array = new Array[Byte](data.remaining)
59+
data.get(array)
60+
stream.write(array)
5861
}
5962
}
6063
flush()
61-
nextOffset = getPosition()
64+
nextOffset = stream.getPos()
6265
segment
6366
}
6467

65-
override private[streaming] def close(): Unit = synchronized {
68+
override def close(): Unit = synchronized {
6669
closed = true
6770
stream.close()
6871
}
6972

70-
71-
private def getPosition(): Long = {
72-
stream.getPos()
73-
}
74-
7573
private def flush() {
74+
hadoopFlushMethod.foreach { _.invoke(stream) }
75+
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
7676
stream.getWrappedStream.flush()
77-
hadoopFlushMethod.foreach {
78-
_.invoke(stream)
79-
}
8077
}
8178

8279
private def assertOpen() {

0 commit comments

Comments
 (0)