From 172358de10a61f296e52fa347c2e40aa87490ecf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Oct 2014 19:52:55 -0700 Subject: [PATCH 01/16] Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working --- .../spark/streaming/storage/FileSegment.scala | 19 ++ .../spark/streaming/storage/HdfsUtils.scala | 72 +++++ .../storage/WriteAheadLogManager.scala | 176 +++++++++++ .../storage/WriteAheadLogRandomReader.scala | 50 ++++ .../storage/WriteAheadLogReader.scala | 76 +++++ .../storage/WriteAheadLogWriter.scala | 100 +++++++ .../storage/WriteAheadLogSuite.scala | 277 ++++++++++++++++++ 7 files changed, 770 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala new file mode 100644 index 0000000000000..eb9c07e9cf61f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala new file mode 100644 index 0000000000000..efb12b82ae949 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} + +private[streaming] object HdfsUtils { + + def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { + // HDFS is not thread-safe when getFileSystem is called, so synchronize on that + + val dfsPath = new Path(path) + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + // If the file exists and we have append support, append instead of creating a new file + val stream: FSDataOutputStream = { + if (dfs.isFile(dfsPath)) { + if (conf.getBoolean("hdfs.append.support", false)) { + dfs.append(dfsPath) + } else { + throw new IllegalStateException("File exists and there is no append support!") + } + } else { + dfs.create(dfsPath) + } + } + stream + } + + def getInputStream(path: String, conf: Configuration): FSDataInputStream = { + val dfsPath = new Path(path) + val dfs = this.synchronized { + dfsPath.getFileSystem(conf) + } + val instream = dfs.open(dfsPath) + instream + } + + def checkState(state: Boolean, errorMsg: => String) { + if(!state) { + throw new IllegalStateException(errorMsg) + } + } + + def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { + val dfsPath = new Path(path) + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + val fileStatus = dfs.getFileStatus(dfsPath) + val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) + blockLocs.map(_.flatMap(_.getHosts)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala new file mode 100644 index 0000000000000..c70ecb0da4e54 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -0,0 +1,176 @@ +package org.apache.spark.streaming.storage + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.Logging +import org.apache.spark.streaming.storage.WriteAheadLogManager._ +import org.apache.spark.streaming.util.{Clock, SystemClock} +import org.apache.spark.util.Utils + +private[streaming] class WriteAheadLogManager( + logDirectory: String, + hadoopConf: Configuration, + rollingIntervalSecs: Int = 60, + maxFailures: Int = 3, + callerName: String = "", + clock: Clock = new SystemClock + ) extends Logging { + + private val pastLogs = new ArrayBuffer[LogInfo] + private val callerNameTag = + if (callerName != null && callerName.nonEmpty) s" for $callerName" else "" + private val threadpoolName = s"WriteAheadLogManager $callerNameTag" + implicit private val executionContext = ExecutionContext.fromExecutorService( + Utils.newDaemonFixedThreadPool(1, threadpoolName)) + override protected val logName = s"WriteAheadLogManager $callerNameTag" + + private var currentLogPath: String = null + private var currentLogWriter: WriteAheadLogWriter = null + private var currentLogWriterStartTime: Long = -1L + private var currentLogWriterStopTime: Long = -1L + + initializeOrRecover() + + def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { + var fileSegment: FileSegment = null + var failures = 0 + var lastException: Exception = null + var succeeded = false + while (!succeeded && failures < maxFailures) { + try { + fileSegment = getLogWriter(clock.currentTime).write(byteBuffer) + succeeded = true + } catch { + case ex: Exception => + lastException = ex + logWarning("Failed to ...") + resetWriter() + failures += 1 + } + } + if (fileSegment == null) { + throw lastException + } + fileSegment + } + + def readFromLog(): Iterator[ByteBuffer] = synchronized { + val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) + logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) + logFilesToRead.iterator.map { file => + logDebug(s"Creating log reader with $file") + new WriteAheadLogReader(file, hadoopConf) + } flatMap { x => x } + } + + /** + * Delete the log files that are older than the threshold time. + * + * Its important to note that the threshold time is based on the time stamps used in the log + * files, and is therefore based on the local system time. So if there is coordination necessary + * between the node calculating the threshTime (say, driver node), and the local system time + * (say, worker node), the caller has to take account of possible time skew. + */ + def cleanupOldLogs(threshTime: Long): Unit = { + val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } + logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") + + def deleteFiles() { + oldLogFiles.foreach { logInfo => + try { + val path = new Path(logInfo.path) + val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) } + fs.delete(path, true) + synchronized { pastLogs -= logInfo } + logDebug(s"Cleared log file $logInfo") + } catch { + case ex: Exception => + logWarning(s"Error clearing log file $logInfo", ex) + } + } + logInfo(s"Cleared log files in $logDirectory older than $threshTime") + } + if (!executionContext.isShutdown) { + Future { deleteFiles() } + } + } + + def stop(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + } + executionContext.shutdown() + logInfo("Stopped log manager") + } + + private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { + if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { + resetWriter() + if (currentLogPath != null) { + pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath) + } + currentLogWriterStartTime = currentTime + currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) + val newLogPath = new Path(logDirectory, + timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) + currentLogPath = newLogPath.toString + currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf) + } + currentLogWriter + } + + private def initializeOrRecover(): Unit = synchronized { + val logDirectoryPath = new Path(logDirectory) + val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) + + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) + pastLogs.clear() + pastLogs ++= logFileInfo + logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory") + logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") + } else { + fileSystem.mkdirs(logDirectoryPath, + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)) + logInfo(s"Created ${logDirectory} for log files") + } + } + + private def resetWriter(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + currentLogWriter = null + } + } +} + +private[storage] object WriteAheadLogManager { + + case class LogInfo(startTime: Long, endTime: Long, path: String) + + val logFileRegex = """log-(\d+)-(\d+)""".r + + def timeToLogFile(startTime: Long, stopTime: Long): String = { + s"log-$startTime-$stopTime" + } + + def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { + files.flatMap { file => + logFileRegex.findFirstIn(file.getName()) match { + case Some(logFileRegex(startTimeStr, stopTimeStr)) => + val startTime = startTimeStr.toLong + val stopTime = stopTimeStr.toLong + Some(LogInfo(startTime, stopTime, file.toString)) + case None => + None + } + }.sortBy { _.startTime } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala new file mode 100644 index 0000000000000..3df024834f7a4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.Closeable +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration + +private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) + extends Closeable { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + + def read(segment: FileSegment): ByteBuffer = synchronized { + assertOpen() + instream.seek(segment.offset) + val nextLength = instream.readInt() + HdfsUtils.checkState(nextLength == segment.length, + "Expected message length to be " + segment.length + ", " + "but was " + nextLength) + val buffer = new Array[Byte](nextLength) + instream.readFully(buffer) + ByteBuffer.wrap(buffer) + } + + override def close(): Unit = synchronized { + closed = true + instream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala new file mode 100644 index 0000000000000..5e0dc1d49a89a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{EOFException, Closeable} +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.Logging + +private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) + extends Iterator[ByteBuffer] with Closeable with Logging { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + private var nextItem: Option[ByteBuffer] = None + + override def hasNext: Boolean = synchronized { + if (closed) { + return false + } + + if (nextItem.isDefined) { // handle the case where hasNext is called without calling next + true + } else { + try { + val length = instream.readInt() + val buffer = new Array[Byte](length) + instream.readFully(buffer) + nextItem = Some(ByteBuffer.wrap(buffer)) + logTrace("Read next item " + nextItem.get) + true + } catch { + case e: EOFException => + logDebug("Error reading next item, EOF reached", e) + close() + false + case e: Exception => + logDebug("Error reading next item, EOF reached", e) + close() + throw e + } + } + } + + override def next(): ByteBuffer = synchronized { + val data = nextItem.getOrElse { + close() + throw new IllegalStateException( + "next called without calling hasNext or after hasNext returned false") + } + nextItem = None // Ensure the next hasNext call loads new data. + data + } + + override def close(): Unit = synchronized { + if (!closed) { + instream.close() + } + closed = true + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala new file mode 100644 index 0000000000000..68a1172d7d282 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io._ +import java.net.URI +import java.nio.ByteBuffer + +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} +import org.apache.spark.streaming.storage.FileSegment + +private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable { + private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { + val uri = new URI(path) + val defaultFs = FileSystem.getDefaultUri(conf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + assert(!new File(uri.getPath).exists) + Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath)))) + } else { + Right(HdfsUtils.getOutputStream(path, conf)) + } + } + + private lazy val hadoopFlushMethod = { + val cls = classOf[FSDataOutputStream] + Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption + } + + private var nextOffset = getPosition() + private var closed = false + + + // Data is always written as: + // - Length - Long + // - Data - of length = Length + def write(data: ByteBuffer): FileSegment = synchronized { + assertOpen() + data.rewind() // Rewind to ensure all data in the buffer is retrieved + val lengthToWrite = data.remaining() + val segment = new FileSegment(path, nextOffset, lengthToWrite) + stream.writeInt(lengthToWrite) + if (data.hasArray) { + stream.write(data.array()) + } else { + // If the buffer is not backed by an array we need to write the data byte by byte + while (data.hasRemaining) { + stream.write(data.get()) + } + } + flush() + nextOffset = getPosition() + segment + } + + override private[streaming] def close(): Unit = synchronized { + closed = true + stream.close() + } + + private def stream(): DataOutputStream = { + underlyingStream.fold(x => x, x => x) + } + + private def getPosition(): Long = { + underlyingStream match { + case Left(localStream) => localStream.size + case Right(dfsStream) => dfsStream.getPos() + } + } + + private def flush() { + underlyingStream match { + case Left(localStream) => localStream.flush + case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) } + } + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala new file mode 100644 index 0000000000000..88b2b5095ceb6 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile} +import java.nio.ByteBuffer + +import scala.util.Random + +import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.commons.io.FileUtils +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils +import WriteAheadLogSuite._ + +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { + + val hadoopConf = new Configuration() + var tempDirectory: File = null + + before { + tempDirectory = Files.createTempDir() + } + + after { + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + } + + test("WriteAheadLogWriter - writing data") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + val segments = dataToWrite.map(data => writer.write(data)) + writer.close() + val writtenData = readDataManually(file, segments) + assert(writtenData.toArray === dataToWrite.toArray) + } + + test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + dataToWrite.foreach { data => + val segment = writer.write(data) + assert(readDataManually(file, Seq(segment)).head === data) + } + writer.close() + } + + test("WriteAheadLogReader - sequentially reading data") { + // Write data manually for testing the sequential reader + val file = File.createTempFile("TestSequentialReads", "", tempDirectory) + val writtenData = generateRandomData() + writeDataManually(writtenData, file) + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val readData = reader.toSeq.map(byteBufferToString) + assert(readData.toList === writtenData.toList) + assert(reader.hasNext === false) + intercept[Exception] { + reader.next() + } + reader.close() + } + + test("WriteAheadLogReader - sequentially reading data written with writer") { + // Write data manually for testing the sequential reader + val file = new File(tempDirectory, "TestWriter") + val dataToWrite = generateRandomData() + writeDataUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + reader.foreach { byteBuffer => + assert(byteBufferToString(byteBuffer) === iter.next()) + } + reader.close() + } + + test("WriteAheadLogRandomReader - reading data using random reader") { + // Write data manually for testing the random reader + val file = File.createTempFile("TestRandomReads", "", tempDirectory) + val writtenData = generateRandomData() + val segments = writeDataManually(writtenData, file) + + // Get a random order of these segments and read them back + val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + writtenDataAndSegments.foreach { case (data, segment) => + assert(data === byteBufferToString(reader.read(segment))) + } + reader.close() + } + + test("WriteAheadLogRandomReader - reading data using random reader written with writer") { + // Write data using writer for testing the random reader + val file = new File(tempDirectory, "TestRandomReads") + val data = generateRandomData() + val segments = writeDataUsingWriter(file, data) + + // Read a random sequence of segments and verify read data + val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + dataAndSegments.foreach { case(data, segment) => + assert(data === byteBufferToString(reader.read(segment))) + } + reader.close() + } + + test("WriteAheadLogManager - write rotating logs") { + // Write data using manager + val dataToWrite = generateRandomData(10) + writeDataUsingManager(tempDirectory, dataToWrite) + + // Read data manually to verify the written data + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + val writtenData = logFiles.flatMap { file => readDataManually(file) } + assert(writtenData.toList === dataToWrite.toList) + } + + test("WriteAheadLogManager - read rotating logs") { + // Write data manually for testing reading through manager + val writtenData = (1 to 10).map { i => + val data = generateRandomData(10) + val file = new File(tempDirectory, s"log-$i-${i + 1}") + writeDataManually(data, file) + data + }.flatten + + // Read data using manager and verify + val readData = readDataUsingManager(tempDirectory) + assert(readData.toList === writtenData.toList) + } + + test("WriteAheadLogManager - recover past logs when creating new manager") { + // Write data with manager, recover with new manager and verify + val dataToWrite = generateRandomData(100) + writeDataUsingManager(tempDirectory, dataToWrite) + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + val readData = readDataUsingManager(tempDirectory) + assert(dataToWrite.toList === readData.toList) + } + + // TODO (Hari, TD): Test different failure conditions of writers and readers. + // - Failure in the middle of write + // - Failure while reading incomplete/corrupt file +} + +object WriteAheadLogSuite { + + private val hadoopConf = new Configuration() + + /** + * Write data to the file and returns the an array of the bytes written. + * This is used to test the WAL reader independently of the WAL writer. + */ + def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = { + val segments = new ArrayBuffer[FileSegment]() + val writer = new RandomAccessFile(file, "rw") + data.foreach { item => + val offset = writer.getFilePointer() + val bytes = Utils.serialize(item) + writer.writeInt(bytes.size) + writer.write(bytes) + segments += FileSegment(file.toString, offset, bytes.size) + } + writer.close() + segments + } + + def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + val segments = data.map { + item => writer.write(item) + } + writer.close() + segments + } + + def writeDataUsingManager(logDirectory: File, data: Seq[String]) { + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + data.foreach { item => + fakeClock.addToTime(500) + manager.writeToLog(item) + } + manager.stop() + } + + /** + * Read data from the given segments of log file and returns the read list of byte buffers. + * This is used to test the WAL writer independently of the WAL reader. + */ + def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = { + val reader = new RandomAccessFile(file, "r") + segments.map { x => + reader.seek(x.offset) + val data = new Array[Byte](x.length) + reader.readInt() + reader.readFully(data) + Utils.deserialize[String](data) + } + } + + def readDataManually(file: File): Seq[String] = { + val reader = new DataInputStream(new FileInputStream(file)) + val buffer = new ArrayBuffer[String] + try { + while (reader.available > 0) { + val length = reader.readInt() + val bytes = new Array[Byte](length) + reader.read(bytes) + buffer += Utils.deserialize[String](bytes) + } + } finally { + reader.close() + } + buffer + } + + def readDataUsingManager(logDirectory: File): Seq[String] = { + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + callerName = "WriteAheadLogSuite") + val data = manager.readFromLog().map(byteBufferToString).toSeq + manager.stop() + data + } + + def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { + (1 to numItems).map { _.toString } + } + + def getLogFilesInDirectory(directory: File): Seq[File] = { + if (directory.exists) { + directory.listFiles().filter(_.getName().startsWith("log-")) + .sortBy(_.getName.split("-")(1).toLong) + } else { + Seq.empty + } + } + + def printData(data: Seq[String]) { + println("# items in data = " + data.size) + println(data.mkString("\n")) + } + + implicit def stringToByteBuffer(str: String): ByteBuffer = { + ByteBuffer.wrap(Utils.serialize(str)) + } + + implicit def byteBufferToString(byteBuffer: ByteBuffer): String = { + Utils.deserialize[String](byteBuffer.array) + } +} From 5182ffb3053a143f221f1e56ed21e2461b4d9e4f Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 21 Oct 2014 12:59:38 -0700 Subject: [PATCH 02/16] Added documentation --- .../storage/WriteAheadLogManager.scala | 44 ++++++++++++++++--- .../storage/WriteAheadLogRandomReader.scala | 5 +++ .../storage/WriteAheadLogReader.scala | 6 +++ .../storage/WriteAheadLogWriter.scala | 14 +++--- .../storage/WriteAheadLogSuite.scala | 35 ++++++++++++--- 5 files changed, 86 insertions(+), 18 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala index c70ecb0da4e54..6ba0e9abee8f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -13,6 +13,24 @@ import org.apache.spark.streaming.storage.WriteAheadLogManager._ import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils +/** + * This class manages write ahead log files. + * - Writes records (bytebuffers) to periodically rotating log files. + * - Recovers the log files and the reads the recovered records upon failures. + * - Cleans up old log files. + * + * Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write + * and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read. + * + *@param logDirectory Directory when rotating log files will be created. + * @param hadoopConf Hadoop configuration for reading/writing log files. + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. + * Default is one minute. + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log. + * Default is three. + * @param callerName Optional name of the class who is using this manager. + * @param clock Optional clock that is used to check for rotation interval. + */ private[streaming] class WriteAheadLogManager( logDirectory: String, hadoopConf: Configuration, @@ -37,6 +55,7 @@ private[streaming] class WriteAheadLogManager( initializeOrRecover() + /** Write a byte buffer to the log file */ def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { var fileSegment: FileSegment = null var failures = 0 @@ -49,17 +68,27 @@ private[streaming] class WriteAheadLogManager( } catch { case ex: Exception => lastException = ex - logWarning("Failed to ...") + logWarning("Failed to write to write ahead log") resetWriter() failures += 1 } } if (fileSegment == null) { + logError(s"Failed to write to write ahead log after $failures failures") throw lastException } fileSegment } + /** + * Read all the existing logs from the log directory. + * + * Note that this is typically called when the caller is initializing and wants + * to recover past state from the write ahead logs (that is, before making any writes). + * If this is called after writes have been made using this manager, then it may not return + * the latest the records. This does not deal with currently active log files, and + * hence the implementation is kept simple. + */ def readFromLog(): Iterator[ByteBuffer] = synchronized { val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) @@ -73,7 +102,7 @@ private[streaming] class WriteAheadLogManager( * Delete the log files that are older than the threshold time. * * Its important to note that the threshold time is based on the time stamps used in the log - * files, and is therefore based on the local system time. So if there is coordination necessary + * files, which is usually based on the local system time. So if there is coordination necessary * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. */ @@ -92,7 +121,7 @@ private[streaming] class WriteAheadLogManager( logDebug(s"Cleared log file $logInfo") } catch { case ex: Exception => - logWarning(s"Error clearing log file $logInfo", ex) + logWarning(s"Error clearing write ahead log file $logInfo", ex) } } logInfo(s"Cleared log files in $logDirectory older than $threshTime") @@ -102,14 +131,16 @@ private[streaming] class WriteAheadLogManager( } } + /** Stop the manager, close any open log writer */ def stop(): Unit = synchronized { if (currentLogWriter != null) { currentLogWriter.close() } executionContext.shutdown() - logInfo("Stopped log manager") + logInfo("Stopped write ahead log manager") } + /** Get the current log writer while taking care of rotation */ private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { resetWriter() @@ -126,6 +157,7 @@ private[streaming] class WriteAheadLogManager( currentLogWriter } + /** Initialize the log directory or recover existing logs inside the directory */ private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) @@ -134,12 +166,12 @@ private[streaming] class WriteAheadLogManager( val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) pastLogs.clear() pastLogs ++= logFileInfo - logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory") + logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") } else { fileSystem.mkdirs(logDirectoryPath, FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)) - logInfo(s"Created ${logDirectory} for log files") + logInfo(s"Created ${logDirectory} for write ahead log files") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala index 3df024834f7a4..912c4308aa8e5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -21,6 +21,11 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration +/** + * A random access reader for reading write ahead log files written using + * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info, + * this reads the record (bytebuffer) from the log file. + */ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) extends Closeable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index 5e0dc1d49a89a..28b5d352cee01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -22,6 +22,12 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging +/** + * A reader for reading write ahead log files written using + * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads + * the records (bytebuffers) in the log file sequentially and return them as an + * iterator of bytebuffers. + */ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) extends Iterator[ByteBuffer] with Closeable with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 68a1172d7d282..d4e417cc21faa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -26,17 +26,21 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} import org.apache.spark.streaming.storage.FileSegment -private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable { +/** + * A writer for writing byte-buffers to a write ahead log file. + */ +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) + extends Closeable { private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { val uri = new URI(path) - val defaultFs = FileSystem.getDefaultUri(conf).getScheme + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme val isDefaultLocal = defaultFs == null || defaultFs == "file" if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { assert(!new File(uri.getPath).exists) Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath)))) } else { - Right(HdfsUtils.getOutputStream(path, conf)) + Right(HdfsUtils.getOutputStream(path, hadoopConf)) } } @@ -49,9 +53,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) private var closed = false - // Data is always written as: - // - Length - Long - // - Data - of length = Length + /** Write the bytebuffer to the log file */ def write(data: ByteBuffer): FileSegment = synchronized { assertOpen() data.rewind() // Rewind to ensure all data in the buffer is retrieved diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala index 88b2b5095ceb6..726393b3dbc86 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -16,22 +16,28 @@ */ package org.apache.spark.streaming.storage -import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile} +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile} import java.nio.ByteBuffer -import scala.util.Random - import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.util.Random -import com.google.common.io.Files -import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import com.google.common.io.Files import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration + import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import WriteAheadLogSuite._ +/** + * This testsuite tests all classes related to write ahead logs. + */ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() @@ -163,8 +169,25 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { assert(dataToWrite.toList === readData.toList) } + test("WriteAheadLogManager - cleanup old logs") { + // Write data with manager, recover with new manager and verify + val dataToWrite = generateRandomData(100) + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + dataToWrite.foreach { item => + fakeClock.addToTime(500) // half second for each + manager.writeToLog(item) + } + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + manager.cleanupOldLogs(fakeClock.currentTime() / 2) + eventually(timeout(1 second), interval(10 milliseconds)) { + assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size) + } + } + // TODO (Hari, TD): Test different failure conditions of writers and readers. - // - Failure in the middle of write // - Failure while reading incomplete/corrupt file } From b06be2bd95750ced0ce1d9670752575129c12022 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Oct 2014 13:07:17 -0700 Subject: [PATCH 03/16] Adding missing license. --- .../streaming/storage/WriteAheadLogManager.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala index 6ba0e9abee8f9..4a4578707917b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.streaming.storage import java.nio.ByteBuffer From 4ab602a0074a2144d33367229358c19d079798d8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Oct 2014 13:22:56 -0700 Subject: [PATCH 04/16] Refactored write ahead stuff from streaming.storage to streaming.util --- .../streaming/{storage => util}/FileSegment.scala | 2 +- .../streaming/{storage => util}/HdfsUtils.scala | 2 +- .../{storage => util}/WriteAheadLogManager.scala | 11 +++++------ .../WriteAheadLogRandomReader.scala | 4 ++-- .../{storage => util}/WriteAheadLogReader.scala | 6 +++--- .../{storage => util}/WriteAheadLogWriter.scala | 3 +-- .../{storage => util}/WriteAheadLogSuite.scala | 12 +++++------- 7 files changed, 18 insertions(+), 22 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/FileSegment.scala (95%) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/HdfsUtils.scala (98%) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/WriteAheadLogManager.scala (95%) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/WriteAheadLogRandomReader.scala (93%) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/WriteAheadLogReader.scala (94%) rename streaming/src/main/scala/org/apache/spark/streaming/{storage => util}/WriteAheadLogWriter.scala (97%) rename streaming/src/test/scala/org/apache/spark/streaming/{storage => util}/WriteAheadLogSuite.scala (99%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala index eb9c07e9cf61f..fb33507768e6d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala @@ -14,6 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala similarity index 98% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index efb12b82ae949..a1826959bb7da 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 4a4578707917b..0bfed99132866 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.nio.ByteBuffer @@ -25,9 +25,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.Logging -import org.apache.spark.streaming.storage.WriteAheadLogManager._ -import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils +import WriteAheadLogManager._ /** * This class manages write ahead log files. @@ -35,8 +34,8 @@ import org.apache.spark.util.Utils * - Recovers the log files and the reads the recovered records upon failures. * - Cleans up old log files. * - * Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write - * and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read. + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. * *@param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. @@ -199,7 +198,7 @@ private[streaming] class WriteAheadLogManager( } } -private[storage] object WriteAheadLogManager { +private[util] object WriteAheadLogManager { case class LogInfo(startTime: Long, endTime: Long, path: String) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala similarity index 93% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala index 912c4308aa8e5..16ad8279528aa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io.Closeable import java.nio.ByteBuffer @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration /** * A random access reader for reading write ahead log files written using - * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info, + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info, * this reads the record (bytebuffer) from the log file. */ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala similarity index 94% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala index 28b5d352cee01..adc2160fdf130 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util -import java.io.{EOFException, Closeable} +import java.io.{Closeable, EOFException} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration @@ -24,7 +24,7 @@ import org.apache.spark.Logging /** * A reader for reading write ahead log files written using - * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads * the records (bytebuffers) in the log file sequentially and return them as an * iterator of bytebuffers. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala similarity index 97% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index d4e417cc21faa..ddbb989165f2e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io._ import java.net.URI @@ -24,7 +24,6 @@ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} -import org.apache.spark.streaming.storage.FileSegment /** * A writer for writing byte-buffers to a write ahead log file. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala similarity index 99% rename from streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 726393b3dbc86..577fc81d0688f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile} import java.nio.ByteBuffer @@ -22,18 +22,16 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.language.postfixOps import scala.util.Random -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.Eventually._ - +import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration - -import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import WriteAheadLogSuite._ +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ /** * This testsuite tests all classes related to write ahead logs. From 5c70d1f2b0050c2d5ca71d5f9d0eb499417b827e Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 21 Oct 2014 21:51:19 -0700 Subject: [PATCH 05/16] Remove underlying stream from the WALWriter. --- .../streaming/util/WriteAheadLogWriter.scala | 25 +++---------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index ddbb989165f2e..47f1e2544caea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} */ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) extends Closeable { - private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { - val uri = new URI(path) - val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme - val isDefaultLocal = defaultFs == null || defaultFs == "file" - if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { - assert(!new File(uri.getPath).exists) - Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath)))) - } else { - Right(HdfsUtils.getOutputStream(path, hadoopConf)) - } - } + private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) private lazy val hadoopFlushMethod = { val cls = classOf[FSDataOutputStream] @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura stream.close() } - private def stream(): DataOutputStream = { - underlyingStream.fold(x => x, x => x) - } private def getPosition(): Long = { - underlyingStream match { - case Left(localStream) => localStream.size - case Right(dfsStream) => dfsStream.getPos() - } + stream.getPos() } private def flush() { - underlyingStream match { - case Left(localStream) => localStream.flush - case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) } + hadoopFlushMethod.foreach { + _.invoke(stream) } } From edcbee113c4947b4ad0e1c344475580ec9d3377c Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 9 Oct 2014 13:18:55 -0700 Subject: [PATCH 06/16] Tests reading and writing data using writers now use Minicluster. Conflicts: streaming/pom.xml streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala --- pom.xml | 6 + streaming/pom.xml | 5 + .../spark/streaming/util/HdfsUtils.scala | 16 +- .../streaming/util/WriteAheadLogManager.scala | 4 - .../streaming/util/WriteAheadLogSuite.scala | 146 +++++++++++------- 5 files changed, 105 insertions(+), 72 deletions(-) diff --git a/pom.xml b/pom.xml index 288bbf1114bea..1adff2243e816 100644 --- a/pom.xml +++ b/pom.xml @@ -406,6 +406,12 @@ akka-slf4j_${scala.binary.version} ${akka.version} + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + ${akka.group} akka-testkit_${scala.binary.version} diff --git a/streaming/pom.xml b/streaming/pom.xml index 12f900c91eb98..5bb5f3e159e3f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -68,6 +68,11 @@ junit-interface test + + org.apache.hadoop + hadoop-minicluster + test + target/scala-${scala.binary.version}/classes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index a1826959bb7da..5c6bcb0cba025 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -25,10 +25,9 @@ private[streaming] object HdfsUtils { // HDFS is not thread-safe when getFileSystem is called, so synchronize on that val dfsPath = new Path(path) - val dfs = - this.synchronized { - dfsPath.getFileSystem(conf) - } + val dfs = this.synchronized { + dfsPath.getFileSystem(conf) + } // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { @@ -54,17 +53,16 @@ private[streaming] object HdfsUtils { } def checkState(state: Boolean, errorMsg: => String) { - if(!state) { + if (!state) { throw new IllegalStateException(errorMsg) } } def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { val dfsPath = new Path(path) - val dfs = - this.synchronized { - dfsPath.getFileSystem(conf) - } + val dfs = this.synchronized { + dfsPath.getFileSystem(conf) + } val fileStatus = dfs.getFileStatus(dfsPath) val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) blockLocs.map(_.flatMap(_.getHosts)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 0bfed99132866..b6f274e4cb948 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -183,10 +183,6 @@ private[streaming] class WriteAheadLogManager( pastLogs ++= logFileInfo logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") - } else { - fileSystem.mkdirs(logDirectoryPath, - FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)) - logInfo(s"Created ${logDirectory} for write ahead log files") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 577fc81d0688f..9a4694d15b976 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -16,9 +16,11 @@ */ package org.apache.spark.streaming.util -import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import org.apache.hadoop.fs.Path + import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions @@ -29,56 +31,66 @@ import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hdfs.MiniDFSCluster +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} import org.apache.spark.util.Utils -import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually._ -/** - * This testsuite tests all classes related to write ahead logs. - */ -class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { val hadoopConf = new Configuration() var tempDirectory: File = null + lazy val dfsDir = Files.createTempDir() + lazy val TEST_BUILD_DATA_KEY: String = "test.build.data" + lazy val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY) + lazy val cluster = new MiniDFSCluster(new Configuration, 2, true, null) + lazy val nnPort = cluster.getNameNode.getNameNodeAddress.getPort + lazy val hdfsUrl = "hdfs://localhost:" + nnPort+ "/" + getRandomString() + "/" + + override def beforeAll() { + System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) + cluster.waitActive() + } before { tempDirectory = Files.createTempDir() } - after { - if (tempDirectory != null && tempDirectory.exists()) { - FileUtils.deleteDirectory(tempDirectory) - tempDirectory = null - } + override def afterAll() { + cluster.shutdown() + FileUtils.deleteDirectory(dfsDir) } test("WriteAheadLogWriter - writing data") { - val file = new File(tempDirectory, Random.nextString(10)) + val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + val writer = new WriteAheadLogWriter(file, hadoopConf) val segments = dataToWrite.map(data => writer.write(data)) writer.close() val writtenData = readDataManually(file, segments) assert(writtenData.toArray === dataToWrite.toArray) } - test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { - val file = new File(tempDirectory, Random.nextString(10)) + test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " + + "Minicluster") { + val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + val writer = new WriteAheadLogWriter(file, hadoopConf) dataToWrite.foreach { data => - val segment = writer.write(data) - assert(readDataManually(file, Seq(segment)).head === data) + val segment = writer.write(ByteBuffer.wrap(data.getBytes())) + val reader = new WriteAheadLogRandomReader(file, hadoopConf) + val dataRead = reader.read(segment) + assert(data === new String(dataRead.array())) } writer.close() } test("WriteAheadLogReader - sequentially reading data") { // Write data manually for testing the sequential reader - val file = File.createTempFile("TestSequentialReads", "", tempDirectory) + val file = hdfsUrl + getRandomString() val writtenData = generateRandomData() writeDataManually(writtenData, file) - val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val reader = new WriteAheadLogReader(file, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) assert(readData.toList === writtenData.toList) assert(reader.hasNext === false) @@ -88,13 +100,13 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { reader.close() } - test("WriteAheadLogReader - sequentially reading data written with writer") { + test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") { // Write data manually for testing the sequential reader - val file = new File(tempDirectory, "TestWriter") + val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() writeDataUsingWriter(file, dataToWrite) val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val reader = new WriteAheadLogReader(file, hadoopConf) reader.foreach { byteBuffer => assert(byteBufferToString(byteBuffer) === iter.next()) } @@ -103,28 +115,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader - val file = File.createTempFile("TestRandomReads", "", tempDirectory) + val file = hdfsUrl + getRandomString() val writtenData = generateRandomData() val segments = writeDataManually(writtenData, file) // Get a random order of these segments and read them back val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + val reader = new WriteAheadLogRandomReader(file, hadoopConf) writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } reader.close() } - test("WriteAheadLogRandomReader - reading data using random reader written with writer") { + test("WriteAheadLogRandomReader - reading data using random reader written with writer using " + + "Minicluster") { // Write data using writer for testing the random reader - val file = new File(tempDirectory, "TestRandomReads") + val file = hdfsUrl + getRandomString() val data = generateRandomData() val segments = writeDataUsingWriter(file, data) // Read a random sequence of segments and verify read data val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + val reader = new WriteAheadLogRandomReader(file, hadoopConf) dataAndSegments.foreach { case(data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -134,54 +147,59 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogManager - write rotating logs") { // Write data using manager val dataToWrite = generateRandomData(10) - writeDataUsingManager(tempDirectory, dataToWrite) + val dir = hdfsUrl + getRandomString() + writeDataUsingManager(dir, dataToWrite) // Read data manually to verify the written data - val logFiles = getLogFilesInDirectory(tempDirectory) + val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) val writtenData = logFiles.flatMap { file => readDataManually(file) } - assert(writtenData.toList === dataToWrite.toList) + assert(writtenData.toSet === dataToWrite.toSet) } - test("WriteAheadLogManager - read rotating logs") { + // This one is failing right now -- commenting out for now. + ignore("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager + val dir = hdfsUrl + getRandomString() val writtenData = (1 to 10).map { i => val data = generateRandomData(10) - val file = new File(tempDirectory, s"log-$i-${i + 1}") + val file = dir + "/" + getRandomString() writeDataManually(data, file) data }.flatten // Read data using manager and verify - val readData = readDataUsingManager(tempDirectory) + val readData = readDataUsingManager(dir) assert(readData.toList === writtenData.toList) } test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify val dataToWrite = generateRandomData(100) - writeDataUsingManager(tempDirectory, dataToWrite) - val logFiles = getLogFilesInDirectory(tempDirectory) + val dir = hdfsUrl + getRandomString() + writeDataUsingManager(dir, dataToWrite) + val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) - val readData = readDataUsingManager(tempDirectory) + val readData = readDataUsingManager(dir) assert(dataToWrite.toList === readData.toList) } test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify + val dir = hdfsUrl + getRandomString() val dataToWrite = generateRandomData(100) val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf, + val manager = new WriteAheadLogManager(dir, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) dataToWrite.foreach { item => fakeClock.addToTime(500) // half second for each manager.writeToLog(item) } - val logFiles = getLogFilesInDirectory(tempDirectory) + val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) manager.cleanupOldLogs(fakeClock.currentTime() / 2) eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size) + assert(getLogFilesInDirectory(dir).size < logFiles.size) } } @@ -197,22 +215,26 @@ object WriteAheadLogSuite { * Write data to the file and returns the an array of the bytes written. * This is used to test the WAL reader independently of the WAL writer. */ - def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = { + def writeDataManually(data: Seq[String], file: String): Seq[FileSegment] = { val segments = new ArrayBuffer[FileSegment]() - val writer = new RandomAccessFile(file, "rw") + val writer = HdfsUtils.getOutputStream(file, hadoopConf) data.foreach { item => - val offset = writer.getFilePointer() + val offset = writer.getPos val bytes = Utils.serialize(item) writer.writeInt(bytes.size) writer.write(bytes) - segments += FileSegment(file.toString, offset, bytes.size) + segments += FileSegment(file, offset, bytes.size) } writer.close() segments } - def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = { - val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + def getRandomString(): String = { + new String(Random.alphanumeric.take(6).toArray) + } + + def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(filePath, hadoopConf) val segments = data.map { item => writer.write(item) } @@ -220,9 +242,9 @@ object WriteAheadLogSuite { segments } - def writeDataUsingManager(logDirectory: File, data: Seq[String]) { + def writeDataUsingManager(logDirectory: String, data: Seq[String]) { val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + val manager = new WriteAheadLogManager(logDirectory, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) data.foreach { item => fakeClock.addToTime(500) @@ -235,8 +257,8 @@ object WriteAheadLogSuite { * Read data from the given segments of log file and returns the read list of byte buffers. * This is used to test the WAL writer independently of the WAL reader. */ - def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = { - val reader = new RandomAccessFile(file, "r") + def readDataManually(file: String, segments: Seq[FileSegment]): Seq[String] = { + val reader = HdfsUtils.getInputStream(file, hadoopConf) segments.map { x => reader.seek(x.offset) val data = new Array[Byte](x.length) @@ -246,24 +268,26 @@ object WriteAheadLogSuite { } } - def readDataManually(file: File): Seq[String] = { - val reader = new DataInputStream(new FileInputStream(file)) + def readDataManually(file: String): Seq[String] = { + val reader = HdfsUtils.getInputStream(file, hadoopConf) val buffer = new ArrayBuffer[String] try { - while (reader.available > 0) { + while (true) { // Read till EOF is thrown val length = reader.readInt() val bytes = new Array[Byte](length) reader.read(bytes) buffer += Utils.deserialize[String](bytes) } + } catch { + case ex: EOFException => } finally { reader.close() } buffer } - def readDataUsingManager(logDirectory: File): Seq[String] = { - val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + def readDataUsingManager(logDirectory: String): Seq[String] = { + val manager = new WriteAheadLogManager(logDirectory, hadoopConf, callerName = "WriteAheadLogSuite") val data = manager.readFromLog().map(byteBufferToString).toSeq manager.stop() @@ -274,10 +298,14 @@ object WriteAheadLogSuite { (1 to numItems).map { _.toString } } - def getLogFilesInDirectory(directory: File): Seq[File] = { - if (directory.exists) { - directory.listFiles().filter(_.getName().startsWith("log-")) - .sortBy(_.getName.split("-")(1).toLong) + def getLogFilesInDirectory(directory: String): Seq[String] = { + val logDirectoryPath = new Path(directory) + val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) + + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + fileSystem.listStatus(logDirectoryPath).map { + _.getPath.toString + } } else { Seq.empty } From b4be0c11f4df23948060c68c1d57acfae6a0d57f Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 00:36:13 -0700 Subject: [PATCH 07/16] Remove unused method --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 9a4694d15b976..1f172b79ca70b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -311,11 +311,6 @@ object WriteAheadLogSuite { } } - def printData(data: Seq[String]) { - println("# items in data = " + data.size) - println(data.mkString("\n")) - } - implicit def stringToByteBuffer(str: String): ByteBuffer = { ByteBuffer.wrap(Utils.serialize(str)) } From 587b876fc36d9cefb1a668ea8698ecd2de31980d Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 13:47:45 -0700 Subject: [PATCH 08/16] Fix broken test. Call getFileSystem only from synchronized method. --- .../spark/streaming/util/HdfsUtils.scala | 16 ++--- .../streaming/util/WriteAheadLogManager.scala | 24 ++++--- .../streaming/util/WriteAheadLogReader.scala | 2 +- .../streaming/util/WriteAheadLogSuite.scala | 68 +++++++++---------- 4 files changed, 55 insertions(+), 55 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 5c6bcb0cba025..5449b87e65b8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -25,9 +25,7 @@ private[streaming] object HdfsUtils { // HDFS is not thread-safe when getFileSystem is called, so synchronize on that val dfsPath = new Path(path) - val dfs = this.synchronized { - dfsPath.getFileSystem(conf) - } + val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { @@ -45,9 +43,7 @@ private[streaming] object HdfsUtils { def getInputStream(path: String, conf: Configuration): FSDataInputStream = { val dfsPath = new Path(path) - val dfs = this.synchronized { - dfsPath.getFileSystem(conf) - } + val dfs = getFileSystemForPath(dfsPath, conf) val instream = dfs.open(dfsPath) instream } @@ -60,11 +56,13 @@ private[streaming] object HdfsUtils { def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { val dfsPath = new Path(path) - val dfs = this.synchronized { - dfsPath.getFileSystem(conf) - } + val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) blockLocs.map(_.flatMap(_.getHosts)) } + + def getFileSystemForPath(path: Path, conf: Configuration) = synchronized { + path.getFileSystem(conf) + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index b6f274e4cb948..2dc2507b33cb5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -63,14 +63,18 @@ private[streaming] class WriteAheadLogManager( Utils.newDaemonFixedThreadPool(1, threadpoolName)) override protected val logName = s"WriteAheadLogManager $callerNameTag" - private var currentLogPath: String = null + private var currentLogPath: Option[String] = None private var currentLogWriter: WriteAheadLogWriter = null private var currentLogWriterStartTime: Long = -1L private var currentLogWriterStopTime: Long = -1L initializeOrRecover() - /** Write a byte buffer to the log file */ + /** + * Write a byte buffer to the log file. This method synchronously writes the data in the + * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed + * to HDFS, and will be available for readers to read. + */ def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { var fileSegment: FileSegment = null var failures = 0 @@ -99,13 +103,13 @@ private[streaming] class WriteAheadLogManager( * Read all the existing logs from the log directory. * * Note that this is typically called when the caller is initializing and wants - * to recover past state from the write ahead logs (that is, before making any writes). + * to recover past state from the write ahead logs (that is, before making any writes). * If this is called after writes have been made using this manager, then it may not return * the latest the records. This does not deal with currently active log files, and * hence the implementation is kept simple. */ def readFromLog(): Iterator[ByteBuffer] = synchronized { - val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) + val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) logFilesToRead.iterator.map { file => logDebug(s"Creating log reader with $file") @@ -130,7 +134,7 @@ private[streaming] class WriteAheadLogManager( oldLogFiles.foreach { logInfo => try { val path = new Path(logInfo.path) - val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) } + val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) fs.delete(path, true) synchronized { pastLogs -= logInfo } logDebug(s"Cleared log file $logInfo") @@ -159,15 +163,15 @@ private[streaming] class WriteAheadLogManager( private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { resetWriter() - if (currentLogPath != null) { - pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath) + currentLogPath.foreach { + pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _) } currentLogWriterStartTime = currentTime currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) val newLogPath = new Path(logDirectory, timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) - currentLogPath = newLogPath.toString - currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf) + currentLogPath = Some(newLogPath.toString) + currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf) } currentLogWriter } @@ -175,7 +179,7 @@ private[streaming] class WriteAheadLogManager( /** Initialize the log directory or recover existing logs inside the directory */ private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) - val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) + val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala index adc2160fdf130..2afc0d1551acf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala @@ -56,7 +56,7 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) close() false case e: Exception => - logDebug("Error reading next item, EOF reached", e) + logWarning("Error while trying to read data from HDFS.", e) close() throw e } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 1f172b79ca70b..03761ca49ac07 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -39,13 +39,13 @@ import org.scalatest.concurrent.Eventually._ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { val hadoopConf = new Configuration() - var tempDirectory: File = null - lazy val dfsDir = Files.createTempDir() - lazy val TEST_BUILD_DATA_KEY: String = "test.build.data" - lazy val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY) - lazy val cluster = new MiniDFSCluster(new Configuration, 2, true, null) - lazy val nnPort = cluster.getNameNode.getNameNodeAddress.getPort - lazy val hdfsUrl = "hdfs://localhost:" + nnPort+ "/" + getRandomString() + "/" + val dfsDir = Files.createTempDir() + val TEST_BUILD_DATA_KEY: String = "test.build.data" + val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY) + val cluster = new MiniDFSCluster(new Configuration, 2, true, null) + val nnPort = cluster.getNameNode.getNameNodeAddress.getPort + val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" + var pathForTest: String = null override def beforeAll() { System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) @@ -53,7 +53,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte } before { - tempDirectory = Files.createTempDir() + pathForTest = hdfsUrl + getRandomString() } override def afterAll() { @@ -62,23 +62,21 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte } test("WriteAheadLogWriter - writing data") { - val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(file, hadoopConf) + val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) val segments = dataToWrite.map(data => writer.write(data)) writer.close() - val writtenData = readDataManually(file, segments) + val writtenData = readDataManually(pathForTest, segments) assert(writtenData.toArray === dataToWrite.toArray) } test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " + "Minicluster") { - val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(file, hadoopConf) + val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) dataToWrite.foreach { data => val segment = writer.write(ByteBuffer.wrap(data.getBytes())) - val reader = new WriteAheadLogRandomReader(file, hadoopConf) + val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) val dataRead = reader.read(segment) assert(data === new String(dataRead.array())) } @@ -87,10 +85,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogReader - sequentially reading data") { // Write data manually for testing the sequential reader - val file = hdfsUrl + getRandomString() val writtenData = generateRandomData() - writeDataManually(writtenData, file) - val reader = new WriteAheadLogReader(file, hadoopConf) + writeDataManually(writtenData, pathForTest) + val reader = new WriteAheadLogReader(pathForTest, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) assert(readData.toList === writtenData.toList) assert(reader.hasNext === false) @@ -102,11 +99,10 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") { // Write data manually for testing the sequential reader - val file = hdfsUrl + getRandomString() val dataToWrite = generateRandomData() - writeDataUsingWriter(file, dataToWrite) + writeDataUsingWriter(pathForTest, dataToWrite) val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader(file, hadoopConf) + val reader = new WriteAheadLogReader(pathForTest, hadoopConf) reader.foreach { byteBuffer => assert(byteBufferToString(byteBuffer) === iter.next()) } @@ -115,13 +111,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader - val file = hdfsUrl + getRandomString() val writtenData = generateRandomData() - val segments = writeDataManually(writtenData, file) + val segments = writeDataManually(writtenData, pathForTest) // Get a random order of these segments and read them back val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(file, hadoopConf) + val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -131,14 +126,13 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogRandomReader - reading data using random reader written with writer using " + "Minicluster") { // Write data using writer for testing the random reader - val file = hdfsUrl + getRandomString() val data = generateRandomData() - val segments = writeDataUsingWriter(file, data) + val segments = writeDataUsingWriter(pathForTest, data) // Read a random sequence of segments and verify read data val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(file, hadoopConf) - dataAndSegments.foreach { case(data, segment) => + val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) + dataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } reader.close() @@ -147,7 +141,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - write rotating logs") { // Write data using manager val dataToWrite = generateRandomData(10) - val dir = hdfsUrl + getRandomString() + val dir = pathForTest writeDataUsingManager(dir, dataToWrite) // Read data manually to verify the written data @@ -158,25 +152,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte } // This one is failing right now -- commenting out for now. - ignore("WriteAheadLogManager - read rotating logs") { + test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager - val dir = hdfsUrl + getRandomString() + val dir = pathForTest val writtenData = (1 to 10).map { i => val data = generateRandomData(10) - val file = dir + "/" + getRandomString() + val file = dir + "/log-" + i writeDataManually(data, file) data }.flatten + val logDirectoryPath = new Path(dir) + val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) + assert(fileSystem.exists(logDirectoryPath) === true) + // Read data using manager and verify val readData = readDataUsingManager(dir) - assert(readData.toList === writtenData.toList) +// assert(readData.toList === writtenData.toList) } test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify val dataToWrite = generateRandomData(100) - val dir = hdfsUrl + getRandomString() + val dir = pathForTest writeDataUsingManager(dir, dataToWrite) val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) @@ -186,7 +184,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify - val dir = hdfsUrl + getRandomString() + val dir = pathForTest val dataToWrite = generateRandomData(100) val fakeClock = new ManualClock val manager = new WriteAheadLogManager(dir, hadoopConf, @@ -300,7 +298,7 @@ object WriteAheadLogSuite { def getLogFilesInDirectory(directory: String): Seq[String] = { val logDirectoryPath = new Path(directory) - val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) + val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { fileSystem.listStatus(logDirectoryPath).map { From 7e40e56a506f48268c529a0352ec29d7bae805b4 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 15:13:11 -0700 Subject: [PATCH 09/16] Restore old build directory after tests --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 03761ca49ac07..d93db9995fda2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -59,6 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte override def afterAll() { cluster.shutdown() FileUtils.deleteDirectory(dfsDir) + System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp) } test("WriteAheadLogWriter - writing data") { From 5ff90ee6842483d8fc26501a7154379015d0bf3e Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 16:12:04 -0700 Subject: [PATCH 10/16] Fix tests to not ignore ordering and also assert all data is present --- .../streaming/util/WriteAheadLogSuite.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index d93db9995fda2..f6a622a3b1f93 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte val hadoopConf = new Configuration() val dfsDir = Files.createTempDir() val TEST_BUILD_DATA_KEY: String = "test.build.data" - val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY) + val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY)) + System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) val cluster = new MiniDFSCluster(new Configuration, 2, true, null) val nnPort = cluster.getNameNode.getNameNodeAddress.getPort - val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" + val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" var pathForTest: String = null override def beforeAll() { - System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) cluster.waitActive() } @@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte override def afterAll() { cluster.shutdown() FileUtils.deleteDirectory(dfsDir) - System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp) + oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _)) } test("WriteAheadLogWriter - writing data") { @@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte assert(writtenData.toArray === dataToWrite.toArray) } - test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " + - "Minicluster") { + test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { val dataToWrite = generateRandomData() val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) dataToWrite.foreach { data => @@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte reader.close() } - test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") { + test("WriteAheadLogReader - sequentially reading data written with writer") { // Write data manually for testing the sequential reader val dataToWrite = generateRandomData() writeDataUsingWriter(pathForTest, dataToWrite) @@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte reader.close() } - test("WriteAheadLogRandomReader - reading data using random reader written with writer using " + - "Minicluster") { + test("WriteAheadLogRandomReader - reading data using random reader written with writer") { // Write data using writer for testing the random reader val data = generateRandomData() val segments = writeDataUsingWriter(pathForTest, data) @@ -148,17 +146,16 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Read data manually to verify the written data val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) - val writtenData = logFiles.flatMap { file => readDataManually(file) } - assert(writtenData.toSet === dataToWrite.toSet) + val writtenData = logFiles.flatMap { file => readDataManually(file)} + assert(writtenData.toList === dataToWrite.toList) } - // This one is failing right now -- commenting out for now. test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager val dir = pathForTest val writtenData = (1 to 10).map { i => val data = generateRandomData(10) - val file = dir + "/log-" + i + val file = dir + s"/log-$i-$i" writeDataManually(data, file) data }.flatten @@ -169,7 +166,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Read data using manager and verify val readData = readDataUsingManager(dir) -// assert(readData.toList === writtenData.toList) + assert(readData.toList === writtenData.toList) } test("WriteAheadLogManager - recover past logs when creating new manager") { @@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte assert(getLogFilesInDirectory(dir).size < logFiles.size) } } - // TODO (Hari, TD): Test different failure conditions of writers and readers. // - Failure while reading incomplete/corrupt file } @@ -271,7 +267,8 @@ object WriteAheadLogSuite { val reader = HdfsUtils.getInputStream(file, hadoopConf) val buffer = new ArrayBuffer[String] try { - while (true) { // Read till EOF is thrown + while (true) { + // Read till EOF is thrown val length = reader.readInt() val bytes = new Array[Byte](length) reader.read(bytes) @@ -294,15 +291,20 @@ object WriteAheadLogSuite { } def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { - (1 to numItems).map { _.toString } + (1 to numItems).map { + _.toString + } } def getLogFilesInDirectory(directory: String): Seq[String] = { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) + implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering + .by(f => f.getModificationTime) + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).map { + fileSystem.listStatus(logDirectoryPath).sorted.map { _.getPath.toString } } else { From 82ce56e8e5c8bed36fb6ea7a61b5b143775f0143 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 16:40:59 -0700 Subject: [PATCH 11/16] Fix file ordering issue in WALManager tests --- .../streaming/util/WriteAheadLogSuite.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index f6a622a3b1f93..21a2c6a58b0c1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,7 +139,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - write rotating logs") { // Write data using manager - val dataToWrite = generateRandomData(10) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) @@ -154,7 +154,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Write data manually for testing reading through manager val dir = pathForTest val writtenData = (1 to 10).map { i => - val data = generateRandomData(10) + val data = generateRandomData() val file = dir + s"/log-$i-$i" writeDataManually(data, file) data @@ -171,7 +171,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) val logFiles = getLogFilesInDirectory(dir) @@ -183,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify val dir = pathForTest - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val fakeClock = new ManualClock val manager = new WriteAheadLogManager(dir, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) @@ -239,8 +239,10 @@ object WriteAheadLogSuite { def writeDataUsingManager(logDirectory: String, data: Seq[String]) { val fakeClock = new ManualClock + fakeClock.setTime(1000000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => fakeClock.addToTime(500) manager.writeToLog(item) @@ -290,8 +292,8 @@ object WriteAheadLogSuite { data } - def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { - (1 to numItems).map { + def generateRandomData(): Seq[String] = { + (1 to 50).map { _.toString } } @@ -300,11 +302,8 @@ object WriteAheadLogSuite { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering - .by(f => f.getModificationTime) - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).sorted.map { + fileSystem.listStatus(logDirectoryPath).map { _.getPath.toString } } else { From 4705fff01ae6c8c244685ecc0c03a3f496e22f39 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 22:40:49 -0700 Subject: [PATCH 12/16] Sort listed files by name. Use local files for WAL tests. --- pom.xml | 6 -- streaming/pom.xml | 5 -- .../spark/streaming/util/HdfsUtils.scala | 9 +- .../streaming/util/WriteAheadLogWriter.scala | 1 + .../streaming/util/WriteAheadLogSuite.scala | 88 ++++++++----------- 5 files changed, 44 insertions(+), 65 deletions(-) diff --git a/pom.xml b/pom.xml index 1adff2243e816..288bbf1114bea 100644 --- a/pom.xml +++ b/pom.xml @@ -406,12 +406,6 @@ akka-slf4j_${scala.binary.version} ${akka.version} - - org.apache.hadoop - hadoop-minicluster - ${hadoop.version} - test - ${akka.group} akka-testkit_${scala.binary.version} diff --git a/streaming/pom.xml b/streaming/pom.xml index 5bb5f3e159e3f..12f900c91eb98 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -68,11 +68,6 @@ junit-interface test - - org.apache.hadoop - hadoop-minicluster - test - target/scala-${scala.binary.version}/classes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 5449b87e65b8e..4a6f3006922cd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -17,13 +17,12 @@ package org.apache.spark.streaming.util import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path} private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { // HDFS is not thread-safe when getFileSystem is called, so synchronize on that - val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file @@ -63,6 +62,10 @@ private[streaming] object HdfsUtils { } def getFileSystemForPath(path: Path, conf: Configuration) = synchronized { - path.getFileSystem(conf) + val fs = path.getFileSystem(conf) + fs match { + case localFs: LocalFileSystem => localFs.getRawFileSystem + case _ => fs + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index 47f1e2544caea..d8341f0b1c936 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -73,6 +73,7 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura } private def flush() { + stream.getWrappedStream.flush() hadoopFlushMethod.foreach { _.invoke(stream) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 21a2c6a58b0c1..bd21f462b46b4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -31,54 +31,44 @@ import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hdfs.MiniDFSCluster -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.util.Utils import org.scalatest.concurrent.Eventually._ -class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() - val dfsDir = Files.createTempDir() - val TEST_BUILD_DATA_KEY: String = "test.build.data" - val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY)) - System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) - val cluster = new MiniDFSCluster(new Configuration, 2, true, null) - val nnPort = cluster.getNameNode.getNameNodeAddress.getPort - val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" - var pathForTest: String = null - - override def beforeAll() { - cluster.waitActive() - } + var tempDir: File = null + var dirForTest: String = null + var fileForTest: String = null before { - pathForTest = hdfsUrl + getRandomString() + tempDir = Files.createTempDir() + dirForTest = "file:///" + tempDir.toString + fileForTest = "file:///" + new File(tempDir, getRandomString()).toString } - override def afterAll() { - cluster.shutdown() - FileUtils.deleteDirectory(dfsDir) - oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _)) + after { + FileUtils.deleteDirectory(tempDir) } test("WriteAheadLogWriter - writing data") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) + val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) val segments = dataToWrite.map(data => writer.write(data)) writer.close() - val writtenData = readDataManually(pathForTest, segments) + val writtenData = readDataManually(fileForTest, segments) assert(writtenData.toArray === dataToWrite.toArray) } test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) + val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) dataToWrite.foreach { data => - val segment = writer.write(ByteBuffer.wrap(data.getBytes())) - val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) + val segment = writer.write(stringToByteBuffer(data)) + val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) val dataRead = reader.read(segment) - assert(data === new String(dataRead.array())) + assert(data === byteBufferToString(dataRead)) } writer.close() } @@ -86,8 +76,8 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogReader - sequentially reading data") { // Write data manually for testing the sequential reader val writtenData = generateRandomData() - writeDataManually(writtenData, pathForTest) - val reader = new WriteAheadLogReader(pathForTest, hadoopConf) + writeDataManually(writtenData, fileForTest) + val reader = new WriteAheadLogReader(fileForTest, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) assert(readData.toList === writtenData.toList) assert(reader.hasNext === false) @@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogReader - sequentially reading data written with writer") { // Write data manually for testing the sequential reader val dataToWrite = generateRandomData() - writeDataUsingWriter(pathForTest, dataToWrite) + writeDataUsingWriter(fileForTest, dataToWrite) val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader(pathForTest, hadoopConf) + val reader = new WriteAheadLogReader(fileForTest, hadoopConf) reader.foreach { byteBuffer => assert(byteBufferToString(byteBuffer) === iter.next()) } @@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader val writtenData = generateRandomData() - val segments = writeDataManually(writtenData, pathForTest) + val segments = writeDataManually(writtenData, fileForTest) // Get a random order of these segments and read them back val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogRandomReader - reading data using random reader written with writer") { // Write data using writer for testing the random reader val data = generateRandomData() - val segments = writeDataUsingWriter(pathForTest, data) + val segments = writeDataUsingWriter(fileForTest, data) // Read a random sequence of segments and verify read data val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) dataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -140,11 +130,10 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - write rotating logs") { // Write data using manager val dataToWrite = generateRandomData() - val dir = pathForTest - writeDataUsingManager(dir, dataToWrite) + writeDataUsingManager(dirForTest, dataToWrite) // Read data manually to verify the written data - val logFiles = getLogFilesInDirectory(dir) + val logFiles = getLogFilesInDirectory(dirForTest) assert(logFiles.size > 1) val writtenData = logFiles.flatMap { file => readDataManually(file)} assert(writtenData.toList === dataToWrite.toList) @@ -152,50 +141,47 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager - val dir = pathForTest val writtenData = (1 to 10).map { i => val data = generateRandomData() - val file = dir + s"/log-$i-$i" + val file = dirForTest + s"/log-$i-$i" writeDataManually(data, file) data }.flatten - val logDirectoryPath = new Path(dir) + val logDirectoryPath = new Path(dirForTest) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) assert(fileSystem.exists(logDirectoryPath) === true) // Read data using manager and verify - val readData = readDataUsingManager(dir) + val readData = readDataUsingManager(dirForTest) assert(readData.toList === writtenData.toList) } test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify val dataToWrite = generateRandomData() - val dir = pathForTest - writeDataUsingManager(dir, dataToWrite) - val logFiles = getLogFilesInDirectory(dir) + writeDataUsingManager(dirForTest, dataToWrite) + val logFiles = getLogFilesInDirectory(dirForTest) assert(logFiles.size > 1) - val readData = readDataUsingManager(dir) + val readData = readDataUsingManager(dirForTest) assert(dataToWrite.toList === readData.toList) } test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify - val dir = pathForTest val dataToWrite = generateRandomData() val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(dir, hadoopConf, + val manager = new WriteAheadLogManager(dirForTest, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) dataToWrite.foreach { item => fakeClock.addToTime(500) // half second for each manager.writeToLog(item) } - val logFiles = getLogFilesInDirectory(dir) + val logFiles = getLogFilesInDirectory(dirForTest) assert(logFiles.size > 1) manager.cleanupOldLogs(fakeClock.currentTime() / 2) eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(dir).size < logFiles.size) + assert(getLogFilesInDirectory(dirForTest).size < logFiles.size) } } // TODO (Hari, TD): Test different failure conditions of writers and readers. @@ -305,7 +291,7 @@ object WriteAheadLogSuite { if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { fileSystem.listStatus(logDirectoryPath).map { _.getPath.toString - } + }.sorted } else { Seq.empty } From 9514dc833c9c30be12eeb64fb4580c2e6f1adb4f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Oct 2014 02:25:48 -0700 Subject: [PATCH 13/16] Added unit tests to test reading of corrupted data and other minor edits --- ...t.scala => WriteAheadLogFileSegment.scala} | 3 +- .../streaming/util/WriteAheadLogManager.scala | 9 +- .../util/WriteAheadLogRandomReader.scala | 4 +- .../streaming/util/WriteAheadLogWriter.scala | 29 +-- .../streaming/util/WriteAheadLogSuite.scala | 222 +++++++++++------- 5 files changed, 158 insertions(+), 109 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/util/{FileSegment.scala => WriteAheadLogFileSegment.scala} (83%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala similarity index 83% rename from streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala index fb33507768e6d..1005a2c8ec303 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala @@ -16,4 +16,5 @@ */ package org.apache.spark.streaming.util -private[streaming] case class FileSegment (path: String, offset: Long, length: Int) +/** Class for representing a segment of data in a write ahead log file */ +private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 2dc2507b33cb5..f0c552e9593c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -75,8 +75,8 @@ private[streaming] class WriteAheadLogManager( * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed * to HDFS, and will be available for readers to read. */ - def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { - var fileSegment: FileSegment = null + def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized { + var fileSegment: WriteAheadLogFileSegment = null var failures = 0 var lastException: Exception = null var succeeded = false @@ -112,8 +112,8 @@ private[streaming] class WriteAheadLogManager( val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) logFilesToRead.iterator.map { file => - logDebug(s"Creating log reader with $file") - new WriteAheadLogReader(file, hadoopConf) + logDebug(s"Creating log reader with $file") + new WriteAheadLogReader(file, hadoopConf) } flatMap { x => x } } @@ -208,6 +208,7 @@ private[util] object WriteAheadLogManager { s"log-$startTime-$stopTime" } + /** Convert a sequence of files to a sequence of sorted LogInfo objects */ def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { files.flatMap { file => logFileRegex.findFirstIn(file.getName()) match { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala index 16ad8279528aa..92bad7a882a65 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala @@ -32,12 +32,12 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura private val instream = HdfsUtils.getInputStream(path, conf) private var closed = false - def read(segment: FileSegment): ByteBuffer = synchronized { + def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized { assertOpen() instream.seek(segment.offset) val nextLength = instream.readInt() HdfsUtils.checkState(nextLength == segment.length, - "Expected message length to be " + segment.length + ", " + "but was " + nextLength) + s"Expected message length to be ${segment.length}, but was $nextLength") val buffer = new Array[Byte](nextLength) instream.readFully(buffer) ByteBuffer.wrap(buffer) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index d8341f0b1c936..679f6a6dfd7c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -34,49 +34,46 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) private lazy val hadoopFlushMethod = { + // Use reflection to get the right flush operation val cls = classOf[FSDataOutputStream] Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption } - private var nextOffset = getPosition() + private var nextOffset = stream.getPos() private var closed = false - /** Write the bytebuffer to the log file */ - def write(data: ByteBuffer): FileSegment = synchronized { + def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized { assertOpen() data.rewind() // Rewind to ensure all data in the buffer is retrieved val lengthToWrite = data.remaining() - val segment = new FileSegment(path, nextOffset, lengthToWrite) + val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite) stream.writeInt(lengthToWrite) if (data.hasArray) { stream.write(data.array()) } else { - // If the buffer is not backed by an array we need to write the data byte by byte + // If the buffer is not backed by an array, we transfer using temp array + // Note that despite the extra array copy, this should be faster than byte-by-byte copy while (data.hasRemaining) { - stream.write(data.get()) + val array = new Array[Byte](data.remaining) + data.get(array) + stream.write(array) } } flush() - nextOffset = getPosition() + nextOffset = stream.getPos() segment } - override private[streaming] def close(): Unit = synchronized { + override def close(): Unit = synchronized { closed = true stream.close() } - - private def getPosition(): Long = { - stream.getPos() - } - private def flush() { + hadoopFlushMethod.foreach { _.invoke(stream) } + // Useful for local file system where hflush/sync does not work (HADOOP-7844) stream.getWrappedStream.flush() - hadoopFlushMethod.foreach { - _.invoke(stream) - } } private def assertOpen() { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index bd21f462b46b4..f86998b5a66b9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,33 +19,36 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import org.apache.hadoop.fs.Path - import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.language.implicitConversions -import scala.language.postfixOps +import scala.language.{implicitConversions, postfixOps} import scala.util.Random import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.hadoop.fs.Path import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually._ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() var tempDir: File = null - var dirForTest: String = null - var fileForTest: String = null + var testDir: String = null + var testFile: String = null + var manager: WriteAheadLogManager = null before { tempDir = Files.createTempDir() - dirForTest = "file:///" + tempDir.toString - fileForTest = "file:///" + new File(tempDir, getRandomString()).toString + testDir = tempDir.toString + testFile = new File(tempDir, Random.nextString(10)).toString + if (manager != null) { + manager.stop() + manager = null + } } after { @@ -54,32 +57,28 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogWriter - writing data") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) - val segments = dataToWrite.map(data => writer.write(data)) - writer.close() - val writtenData = readDataManually(fileForTest, segments) - assert(writtenData.toArray === dataToWrite.toArray) + val segments = writeDataUsingWriter(testFile, dataToWrite) + val writtenData = readDataManually(testFile, segments) + assert(writtenData === dataToWrite) } test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) + val writer = new WriteAheadLogWriter(testFile, hadoopConf) dataToWrite.foreach { data => val segment = writer.write(stringToByteBuffer(data)) - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) - val dataRead = reader.read(segment) - assert(data === byteBufferToString(dataRead)) + val dataRead = readDataManually(testFile, Seq(segment)).head + assert(data === dataRead) } writer.close() } test("WriteAheadLogReader - sequentially reading data") { - // Write data manually for testing the sequential reader val writtenData = generateRandomData() - writeDataManually(writtenData, fileForTest) - val reader = new WriteAheadLogReader(fileForTest, hadoopConf) + writeDataManually(writtenData, testFile) + val reader = new WriteAheadLogReader(testFile, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) - assert(readData.toList === writtenData.toList) + assert(readData === writtenData) assert(reader.hasNext === false) intercept[Exception] { reader.next() @@ -88,25 +87,43 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("WriteAheadLogReader - sequentially reading data written with writer") { + val dataToWrite = generateRandomData() + writeDataUsingWriter(testFile, dataToWrite) + val readData = readDataUsingReader(testFile) + assert(readData === dataToWrite) + } + + test("WriteAheadLogReader - reading data written with writer after corrupted write") { // Write data manually for testing the sequential reader val dataToWrite = generateRandomData() - writeDataUsingWriter(fileForTest, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader(fileForTest, hadoopConf) - reader.foreach { byteBuffer => - assert(byteBufferToString(byteBuffer) === iter.next()) - } - reader.close() + writeDataUsingWriter(testFile, dataToWrite) + val fileLength = new File(testFile).length() + + // Append some garbage data to get the effect of a corrupted write + val fw = new FileWriter(testFile, true) + fw.append("This line appended to file!") + fw.close() + + // Verify the data can be read and is same as the one correctly written + assert(readDataUsingReader(testFile) === dataToWrite) + + // Corrupt the last correctly written file + val raf = new FileOutputStream(testFile, true).getChannel() + raf.truncate(fileLength - 1) + raf.close() + + // Verify all the data except the last can be read + assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1))) } test("WriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader val writtenData = generateRandomData() - val segments = writeDataManually(writtenData, fileForTest) + val segments = writeDataManually(writtenData, testFile) // Get a random order of these segments and read them back val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(testFile, hadoopConf) writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -116,11 +133,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogRandomReader - reading data using random reader written with writer") { // Write data using writer for testing the random reader val data = generateRandomData() - val segments = writeDataUsingWriter(fileForTest, data) + val segments = writeDataUsingWriter(testFile, data) // Read a random sequence of segments and verify read data val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(testFile, hadoopConf) dataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -130,91 +147,112 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogManager - write rotating logs") { // Write data using manager val dataToWrite = generateRandomData() - writeDataUsingManager(dirForTest, dataToWrite) + writeDataUsingManager(testDir, dataToWrite) // Read data manually to verify the written data - val logFiles = getLogFilesInDirectory(dirForTest) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) val writtenData = logFiles.flatMap { file => readDataManually(file)} - assert(writtenData.toList === dataToWrite.toList) + assert(writtenData === dataToWrite) } test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager val writtenData = (1 to 10).map { i => val data = generateRandomData() - val file = dirForTest + s"/log-$i-$i" + val file = testDir + s"/log-$i-$i" writeDataManually(data, file) data }.flatten - val logDirectoryPath = new Path(dirForTest) + val logDirectoryPath = new Path(testDir) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) assert(fileSystem.exists(logDirectoryPath) === true) // Read data using manager and verify - val readData = readDataUsingManager(dirForTest) - assert(readData.toList === writtenData.toList) + val readData = readDataUsingManager(testDir) + assert(readData === writtenData) } test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify val dataToWrite = generateRandomData() - writeDataUsingManager(dirForTest, dataToWrite) - val logFiles = getLogFilesInDirectory(dirForTest) + writeDataUsingManager(testDir, dataToWrite) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - val readData = readDataUsingManager(dirForTest) - assert(dataToWrite.toList === readData.toList) + val readData = readDataUsingManager(testDir) + assert(dataToWrite === readData) } test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify + val manualClock = new ManualClock val dataToWrite = generateRandomData() - val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(dirForTest, hadoopConf, - rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) - dataToWrite.foreach { item => - fakeClock.addToTime(500) // half second for each - manager.writeToLog(item) - } - val logFiles = getLogFilesInDirectory(dirForTest) + manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(fakeClock.currentTime() / 2) + manager.cleanupOldLogs(manualClock.currentTime() / 2) eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(dirForTest).size < logFiles.size) + assert(getLogFilesInDirectory(testDir).size < logFiles.size) + } + } + + test("WriteAheadLogManager - handling file errors while reading rotating logs") { + // Generate a set of log files + val manualClock = new ManualClock + val dataToWrite1 = generateRandomData() + writeDataUsingManager(testDir, dataToWrite1, manualClock) + val logFiles1 = getLogFilesInDirectory(testDir) + assert(logFiles1.size > 1) + + + // Recover old files and generate a second set of log files + val dataToWrite2 = generateRandomData() + manualClock.addToTime(100000) + writeDataUsingManager(testDir, dataToWrite2, manualClock) + val logFiles2 = getLogFilesInDirectory(testDir) + assert(logFiles2.size > logFiles1.size) + + // Read the files and verify that all the written data can be read + val readData1 = readDataUsingManager(testDir) + assert(readData1 === (dataToWrite1 ++ dataToWrite2)) + + // Corrupt the first set of files so that they are basically unreadable + logFiles1.foreach { f => + val raf = new FileOutputStream(f, true).getChannel() + raf.truncate(1) + raf.close() } + + // Verify that the corrupted files do not prevent reading of the second set of data + val readData = readDataUsingManager(testDir) + assert(readData === dataToWrite2) } - // TODO (Hari, TD): Test different failure conditions of writers and readers. - // - Failure while reading incomplete/corrupt file } object WriteAheadLogSuite { private val hadoopConf = new Configuration() - /** - * Write data to the file and returns the an array of the bytes written. - * This is used to test the WAL reader independently of the WAL writer. - */ - def writeDataManually(data: Seq[String], file: String): Seq[FileSegment] = { - val segments = new ArrayBuffer[FileSegment]() + /** Write data to a file directly and return an array of the file segments written. */ + def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = { + val segments = new ArrayBuffer[WriteAheadLogFileSegment]() val writer = HdfsUtils.getOutputStream(file, hadoopConf) data.foreach { item => val offset = writer.getPos val bytes = Utils.serialize(item) writer.writeInt(bytes.size) writer.write(bytes) - segments += FileSegment(file, offset, bytes.size) + segments += WriteAheadLogFileSegment(file, offset, bytes.size) } writer.close() segments } - def getRandomString(): String = { - new String(Random.alphanumeric.take(6).toArray) - } - - def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[FileSegment] = { + /** + * Write data to a file using the writer class and return an array of the file segments written. + */ + def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = { val writer = new WriteAheadLogWriter(filePath, hadoopConf) val segments = data.map { item => writer.write(item) @@ -223,24 +261,27 @@ object WriteAheadLogSuite { segments } - def writeDataUsingManager(logDirectory: String, data: Seq[String]) { - val fakeClock = new ManualClock - fakeClock.setTime(1000000) + /** Write data to rotating files in log directory using the manager class. */ + def writeDataUsingManager( + logDirectory: String, + data: Seq[String], + manualClock: ManualClock = new ManualClock, + stopManager: Boolean = true + ): WriteAheadLogManager = { + if (manualClock.currentTime < 100000) manualClock.setTime(10000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, - rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock) // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => - fakeClock.addToTime(500) + manualClock.addToTime(500) manager.writeToLog(item) } - manager.stop() + if (stopManager) manager.stop() + manager } - /** - * Read data from the given segments of log file and returns the read list of byte buffers. - * This is used to test the WAL writer independently of the WAL reader. - */ - def readDataManually(file: String, segments: Seq[FileSegment]): Seq[String] = { + /** Read data from a segments of a log file directly and return the list of byte buffers.*/ + def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = { val reader = HdfsUtils.getInputStream(file, hadoopConf) segments.map { x => reader.seek(x.offset) @@ -251,6 +292,7 @@ object WriteAheadLogSuite { } } + /** Read all the data from a log file directly and return the list of byte buffers. */ def readDataManually(file: String): Seq[String] = { val reader = HdfsUtils.getInputStream(file, hadoopConf) val buffer = new ArrayBuffer[String] @@ -270,6 +312,15 @@ object WriteAheadLogSuite { buffer } + /** Read all the data from a log file using reader class and return the list of byte buffers. */ + def readDataUsingReader(file: String): Seq[String] = { + val reader = new WriteAheadLogReader(file, hadoopConf) + val readData = reader.toList.map(byteBufferToString) + reader.close() + readData + } + + /** Read all the data in the log file in a directory using the manager class. */ def readDataUsingManager(logDirectory: String): Seq[String] = { val manager = new WriteAheadLogManager(logDirectory, hadoopConf, callerName = "WriteAheadLogSuite") @@ -278,25 +329,24 @@ object WriteAheadLogSuite { data } - def generateRandomData(): Seq[String] = { - (1 to 50).map { - _.toString - } - } - + /** Get the log files in a direction */ def getLogFilesInDirectory(directory: String): Seq[String] = { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { fileSystem.listStatus(logDirectoryPath).map { - _.getPath.toString + _.getPath.toString.stripPrefix("file:") }.sorted } else { Seq.empty } } + def generateRandomData(): Seq[String] = { + (1 to 100).map { _.toString } + } + implicit def stringToByteBuffer(str: String): ByteBuffer = { ByteBuffer.wrap(Utils.serialize(str)) } From a317a4d4696e758607dd059785b303640cfdd149 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 23 Oct 2014 09:26:02 -0700 Subject: [PATCH 14/16] Directory deletion should not fail tests --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index bd21f462b46b4..67e5e7a729c3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -49,7 +49,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } after { - FileUtils.deleteDirectory(tempDir) + FileUtils.deleteQuietly(tempDir) } test("WriteAheadLogWriter - writing data") { From 55514e28a662cc4f49d84492c6d2a59f901f5d5d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Oct 2014 18:19:38 -0700 Subject: [PATCH 15/16] Minor changes based on PR comments. --- .../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 7 ++++--- .../apache/spark/streaming/util/WriteAheadLogManager.scala | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 4a6f3006922cd..3f3143b820247 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -17,12 +17,11 @@ package org.apache.spark.streaming.util import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs._ private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { - // HDFS is not thread-safe when getFileSystem is called, so synchronize on that val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file @@ -61,7 +60,9 @@ private[streaming] object HdfsUtils { blockLocs.map(_.flatMap(_.getHosts)) } - def getFileSystemForPath(path: Path, conf: Configuration) = synchronized { + def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = synchronized { + // For local file systems, return the raw loca file system, such calls to flush() + // actually flushes the stream. val fs = path.getFileSystem(conf) fs match { case localFs: LocalFileSystem => localFs.getRawFileSystem diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index f0c552e9593c4..70d234320be7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -37,7 +37,7 @@ import WriteAheadLogManager._ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. * - *@param logDirectory Directory when rotating log files will be created. + * @param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. * Default is one minute. @@ -57,7 +57,7 @@ private[streaming] class WriteAheadLogManager( private val pastLogs = new ArrayBuffer[LogInfo] private val callerNameTag = - if (callerName != null && callerName.nonEmpty) s" for $callerName" else "" + if (callerName.nonEmpty) s" for $callerName" else "" private val threadpoolName = s"WriteAheadLogManager $callerNameTag" implicit private val executionContext = ExecutionContext.fromExecutorService( Utils.newDaemonFixedThreadPool(1, threadpoolName)) From e4bee2065293d7373c43fe5636dd9971dede257e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Oct 2014 18:27:31 -0700 Subject: [PATCH 16/16] Removed synchronized, Path.getFileSystem is threadsafe --- .../main/scala/org/apache/spark/streaming/util/HdfsUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 3f3143b820247..491f1175576e6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -60,7 +60,7 @@ private[streaming] object HdfsUtils { blockLocs.map(_.flatMap(_.getHosts)) } - def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = synchronized { + def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = { // For local file systems, return the raw loca file system, such calls to flush() // actually flushes the stream. val fs = path.getFileSystem(conf)