From d06fa21f80eeda04c11e454a461902717a2e5c7d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 27 Apr 2015 20:33:59 -0700 Subject: [PATCH 1/6] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD --- .../scala/org/apache/spark/rdd/BlockRDD.scala | 15 +++- .../dstream/ReceiverInputDStream.scala | 35 ++++---- .../rdd/WriteAheadLogBackedBlockRDD.scala | 80 +++++++++++++------ .../receiver/ReceiverSupervisorImpl.scala | 2 +- .../scheduler/ReceivedBlockInfo.scala | 30 ++++++- .../scheduler/ReceivedBlockTracker.scala | 20 ++--- .../streaming/ReceivedBlockTrackerSuite.scala | 29 +++---- .../WriteAheadLogBackedBlockRDDSuite.scala | 4 +- .../StreamingJobProgressListenerSuite.scala | 8 +- 9 files changed, 134 insertions(+), 89 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 71578d1210fde..6d52bbd2e6e68 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -31,8 +31,9 @@ private[spark] class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { - @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) @volatile private var _isValid = true + @volatile private var _setInvalid = true override def getPartitions: Array[Partition] = { assertValid() @@ -54,7 +55,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds override def getPreferredLocations(split: Partition): Seq[String] = { assertValid() - locations_(split.asInstanceOf[BlockRDDPartition].blockId) + _locations(split.asInstanceOf[BlockRDDPartition].blockId) } /** @@ -66,7 +67,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds blockIds.foreach { blockId => sc.env.blockManager.master.removeBlock(blockId) } - _isValid = false + if (_setInvalid) { + _isValid = false + } } /** @@ -85,8 +88,12 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds } } + protected def setInvalidIfBlocksRemoved(setInvalid: Boolean): Unit = { + _setInvalid = setInvalid + } + protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { - locations_ + _locations } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 8be04314c4285..65faea23fd390 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -67,27 +67,26 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch - val blockInfos = - ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty) - val blockStoreResults = blockInfos.map { _.blockStoreResult } - val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray + val receiverTracker = ssc.scheduler.receiverTracker + val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) + val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray - // Check whether all the results are of the same type - val resultTypes = blockStoreResults.map { _.getClass }.distinct - if (resultTypes.size > 1) { - logWarning("Multiple result types in block information, WAL information will be ignored.") - } + // Is WAL segment info present with all the blocks + val isWALSegmentInfoPresent = blockInfos.forall { _.writeAheadLogSegmentOption.nonEmpty } - // If all the results are of type WriteAheadLogBasedStoreResult, then create - // WriteAheadLogBackedBlockRDD else create simple BlockRDD. - if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) { - val logSegments = blockStoreResults.map { - _.asInstanceOf[WriteAheadLogBasedStoreResult].segment - }.toArray - // Since storeInBlockManager = false, the storage level does not matter. - new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, - blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER) + if (isWALSegmentInfoPresent) { + // If all the blocks have WAL segment info, then create a WALBackedBlockRDD + val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray + val blockWALSegments = blockInfos.map { _.writeAheadLogSegmentOption.get }.toArray + new WriteAheadLogBackedBlockRDD[T]( + ssc.sparkContext, blockIds, blockWALSegments, isBlockIdValid) } else { + // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others + // then that is unexpected and log a warning accordingly. + if (blockInfos.find(_.writeAheadLogSegmentOption.nonEmpty).nonEmpty) { + logWarning("Could not find Write Ahead Log information on some of the blocks, " + + "data may not be recoverable after driver failures") + } new BlockRDD[T](ssc.sc, blockIds) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 93caa4ba35c7f..a1caf6ed52f4b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.rdd.BlockRDD -import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.storage.{StreamBlockId, BlockId, StorageLevel} import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader} /** @@ -37,6 +37,7 @@ private[streaming] class WriteAheadLogBackedBlockRDDPartition( val index: Int, val blockId: BlockId, + val isBlockIdValid: Boolean, val segment: WriteAheadLogFileSegment) extends Partition @@ -45,11 +46,19 @@ class WriteAheadLogBackedBlockRDDPartition( * This class represents a special case of the BlockRDD where the data blocks in * the block manager are also backed by segments in write ahead logs. For reading * the data, this RDD first looks up the blocks by their ids in the block manager. - * If it does not find them, it looks up the corresponding file segment. + * If it does not find them, it looks up the corresponding file segment. The finding + * of the blocks by their ids can be skipped by setting the corresponding element in + * isBlockIdValid to false. This is a performance optimization which does not affect + * correctness, and it can be used in situations where it is known that the block + * does not exist in the Spark executors (e.g. after a failed driver is restarted). + * * * @param sc SparkContext * @param blockIds Ids of the blocks that contains this RDD's data * @param segments Segments in write ahead logs that contain this RDD's data + * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark + * executors). If not, then block lookups by the block ids will be skipped. + * By default, this is an empty array signifying true for all the blocks. * @param storeInBlockManager Whether to store in the block manager after reading from the segment * @param storageLevel storage level to store when storing in block manager * (applicable when storeInBlockManager = true) @@ -59,23 +68,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient blockIds: Array[BlockId], @transient segments: Array[WriteAheadLogFileSegment], - storeInBlockManager: Boolean, - storageLevel: StorageLevel) + @transient isBlockIdValid: Array[Boolean] = Array.empty, + storeInBlockManager: Boolean = false, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER) extends BlockRDD[T](sc, blockIds) { require( blockIds.length == segments.length, - s"Number of block ids (${blockIds.length}) must be " + - s"the same as number of segments (${segments.length}})!") + s"Number of block Ids (${blockIds.length}) must be " + + s" same as number of segments (${segments.length}})") + + require( + isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length, + s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " + + s" same as number of block Ids (${blockIds.length})") // Hadoop configuration is not serializable, so broadcast it as a serializable. @transient private val hadoopConfig = sc.hadoopConfiguration private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig) + setInvalidIfBlocksRemoved(false) + override def getPartitions: Array[Partition] = { assertValid() - Array.tabulate(blockIds.size) { i => - new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) + Array.tabulate(blockIds.length) { i => + val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i) + new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, segments(i)) } } @@ -90,22 +108,29 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( val blockManager = SparkEnv.get.blockManager val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockId = partition.blockId - blockManager.get(blockId) match { - case Some(block) => // Data is in Block Manager - val iterator = block.data.asInstanceOf[Iterator[T]] - logDebug(s"Read partition data of $this from block manager, block $blockId") - iterator - case None => // Data not found in Block Manager, grab it from write ahead log file - val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) - val dataRead = reader.read(partition.segment) - reader.close() - logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}") - if (storeInBlockManager) { - blockManager.putBytes(blockId, dataRead, storageLevel) - logDebug(s"Stored partition data of $this into block manager with level $storageLevel") - dataRead.rewind() - } - blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + val segment = partition.segment + + def getBlockFromBlockManager(): Option[Iterator[T]] = { + blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) + } + + def getBlockFromWriteAheadLog(): Iterator[T] = { + val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf) + val dataRead = reader.read(segment) + reader.close() + logDebug(s"Read partition data of $this from write ahead log, segment ${partition.segment}") + if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) + logDebug(s"Stored partition data of $this into block manager with level $storageLevel") + dataRead.rewind() + } + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + } + + if (partition.isBlockIdValid) { + getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } + } else { + getBlockFromWriteAheadLog() } } @@ -116,7 +141,12 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] - val blockLocations = getBlockIdLocations().get(partition.blockId) + val blockLocations = if (partition.isBlockIdValid) { + getBlockIdLocations().get(partition.blockId) + } else { + None + } + blockLocations.getOrElse( HdfsUtils.getFileSegmentLocations( partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 89af40330b9d9..124969b94630d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -145,7 +145,7 @@ private[streaming] class ReceiverSupervisorImpl( val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") - val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) + val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala index 94beb590f52d6..5f7868257cf6e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala @@ -17,12 +17,38 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, ReceivedBlockStoreResult} +import org.apache.spark.streaming.util.WriteAheadLogFileSegment /** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, numRecords: Long, + metadataOption: Option[Any], blockStoreResult: ReceivedBlockStoreResult - ) + ) { + + @volatile private var _isBlockIdValid = true + + def blockId: StreamBlockId = blockStoreResult.blockId + + def writeAheadLogSegmentOption: Option[WriteAheadLogFileSegment] = { + blockStoreResult match { + case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.segment) + case _ => None + } + } + + /** Is the block ID valid, that is, is the block present in the Spark executors. */ + def isBlockIdValid(): Boolean = _isBlockIdValid + + /** + * Set the block ID as invalid. This is useful when it is known that the block is not present + * in the Spark executors. + */ + def setBlockIdInvalid(): Unit = { + _isBlockIdValid = false + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 200cf4ef4b0f1..fac4fa43bf748 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -25,10 +25,10 @@ import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, Logging, SparkConf} import org.apache.spark.streaming.Time import org.apache.spark.streaming.util.WriteAheadLogManager import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait representing any event in the ReceivedBlockTracker that updates its state. */ private[streaming] sealed trait ReceivedBlockTrackerLogEvent @@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time]) private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) { def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = { - streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty) + streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty) } } @@ -171,6 +171,7 @@ private[streaming] class ReceivedBlockTracker( // Insert the recovered block information def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) { logTrace(s"Recovery: Inserting added block $receivedBlockInfo") + receivedBlockInfo.setBlockIdInvalid() getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } @@ -223,22 +224,13 @@ private[streaming] class ReceivedBlockTracker( /** Optionally create the write ahead log manager only if the feature is enabled */ private def createLogManager(): Option[WriteAheadLogManager] = { - if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { - if (checkpointDirOption.isEmpty) { - throw new SparkException( - "Cannot enable receiver write-ahead log without checkpoint directory set. " + - "Please use streamingContext.checkpoint() to set the checkpoint directory. " + - "See documentation for more details.") - } - val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get) + checkpointDirOption.map { checkpointDir => + val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir) val rollingIntervalSecs = conf.getInt( "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) - val logManager = new WriteAheadLogManager(logDir, hadoopConf, + new WriteAheadLogManager(logDir, hadoopConf, rollingIntervalSecs = rollingIntervalSecs, clock = clock, callerName = "ReceivedBlockHandlerMaster") - Some(logManager) - } else { - None } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index b63b37d9f9cef..a28b1930fff40 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -88,7 +88,7 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } - test("block addition, block to batch allocation and cleanup with write ahead log") { + test("recovery and cleanup with write ahead logs") { val manualClock = new ManualClock // Set the time increment level to twice the rotation interval so that every increment creates // a new log file @@ -114,7 +114,6 @@ class ReceivedBlockTrackerSuite } // Start tracker and add blocks - conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") val tracker1 = createTracker(clock = manualClock) tracker1.isLogManagerEnabled should be (true) @@ -130,7 +129,11 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered list of unallocated blocks incrementTime() val tracker2 = createTracker(clock = manualClock) - tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 + val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList + unallocatedBlocks shouldEqual blockInfos1 + unallocatedBlocks.foreach { block => + block.isBlockIdValid() should be (false) + } // Allocate blocks to batch and verify whether the unallocated blocks got allocated val batchTime1 = manualClock.getTimeMillis() @@ -182,22 +185,10 @@ class ReceivedBlockTrackerSuite tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } - test("enabling write ahead log but not setting checkpoint dir") { - conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - intercept[SparkException] { - createTracker(setCheckpointDir = false) - } - } - - test("setting checkpoint dir but not enabling write ahead log") { - // When WAL config is not set, log manager should not be enabled - val tracker1 = createTracker(setCheckpointDir = true) + test("write ahead log disabled when not checkpoint directory is set") { + // When checkpoint is not enabled, then the write ahead log is also disabled + val tracker1 = createTracker(setCheckpointDir = false) tracker1.isLogManagerEnabled should be (false) - - // When WAL is explicitly disabled, log manager should not be enabled - conf.set("spark.streaming.receiver.writeAheadLog.enable", "false") - val tracker2 = createTracker(setCheckpointDir = true) - tracker2.isLogManagerEnabled should be(false) } /** @@ -215,7 +206,7 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { - List.fill(5)(ReceivedBlockInfo(streamId, 0, + List.fill(5)(ReceivedBlockInfo(streamId, 0, None, BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c3602a5b73732..857bbe85e6435 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -128,12 +128,12 @@ class WriteAheadLogBackedBlockRDDSuite // Create the RDD and verify whether the returned data is correct val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, - segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY) + segments.toArray, storeInBlockManager = false, storageLevel = StorageLevel.MEMORY_ONLY) assert(rdd.collect() === data.flatten) if (testStoreInBM) { val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, - segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY) + segments.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY) assert(rdd2.collect() === data.flatten) assert( blockIds.forall(blockManager.get(_).nonEmpty), diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 94b1985116feb..c5520ab5fd25b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -36,8 +36,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val listener = new StreamingJobProgressListener(ssc) val receivedBlockInfo = Map( - 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), - 1 -> Array(ReceivedBlockInfo(1, 300, null)) + 0 -> Array(ReceivedBlockInfo(0, 100, None, null), ReceivedBlockInfo(0, 200, None, null)), + 1 -> Array(ReceivedBlockInfo(1, 300, None, null)) ) // onBatchSubmitted @@ -104,8 +104,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val listener = new StreamingJobProgressListener(ssc) val receivedBlockInfo = Map( - 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), - 1 -> Array(ReceivedBlockInfo(1, 300, null)) + 0 -> Array(ReceivedBlockInfo(0, 100, None, null), ReceivedBlockInfo(0, 200, None, null)), + 1 -> Array(ReceivedBlockInfo(1, 300, None, null)) ) val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) From 1bc5bc384bc5baa9d904588b36dd08393bc3c91f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Apr 2015 01:11:13 -0700 Subject: [PATCH 2/6] Fixed bug on unexpected recovery --- .../scheduler/ReceivedBlockTracker.scala | 7 +++-- .../streaming/scheduler/ReceiverTracker.scala | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 29 ++++++++++++++----- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index fac4fa43bf748..e029d93334896 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -63,6 +63,7 @@ private[streaming] class ReceivedBlockTracker( hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock, + recoverFromWriteAheadLog: Boolean, checkpointDirOption: Option[String]) extends Logging { @@ -75,7 +76,9 @@ private[streaming] class ReceivedBlockTracker( private var lastAllocatedBatchTime: Time = null // Recover block information from write ahead logs - recoverFromWriteAheadLogs() + if (recoverFromWriteAheadLog) { + recoverPastEvents() + } /** Add received block. This event will get written to the write ahead log (if enabled). */ def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized { @@ -167,7 +170,7 @@ private[streaming] class ReceivedBlockTracker( * Recover all the tracker actions from the write ahead logs to recover the state (unallocated * and allocated block info) prior to failure. */ - private def recoverFromWriteAheadLogs(): Unit = synchronized { + private def recoverPastEvents(): Unit = synchronized { // Insert the recovered block information def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) { logTrace(s"Recovery: Inserting added block $receivedBlockInfo") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index c4ead6f30a63d..a8398e0026bc8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -61,6 +61,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, + ssc.isCheckpointPresent, Option(ssc.checkpointDir) ) private val listenerBus = ssc.scheduler.listenerBus diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index a28b1930fff40..cec6e9a7fd8b6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -67,15 +67,20 @@ class ReceivedBlockTrackerSuite // Verify added blocks are unallocated blocks receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos + receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true) + // Allocate the blocks to a batch and verify that all of them have been allocated receivedBlockTracker.allocateBlocksToBatch(1) receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos + receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos) receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty + receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false) // Allocate no blocks to another batch receivedBlockTracker.allocateBlocksToBatch(2) receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty + receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty) // Verify that older batches have no operation on batch allocation, // will return the same blocks as previously allocated. @@ -126,19 +131,27 @@ class ReceivedBlockTrackerSuite getWrittenLogData() shouldEqual expectedWrittenData1 getWriteAheadLogFiles() should have size 1 - // Restart tracker and verify recovered list of unallocated blocks incrementTime() - val tracker2 = createTracker(clock = manualClock) + + // Recovery without recovery from WAL and verify list of unallocated blocks is empty + val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false) + tracker1_.getUnallocatedBlocks(streamId) shouldBe empty + tracker1_.hasUnallocatedReceivedBlocks should be (false) + + // Restart tracker and verify recovered list of unallocated blocks + val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList unallocatedBlocks shouldEqual blockInfos1 unallocatedBlocks.foreach { block => block.isBlockIdValid() should be (false) } + // Allocate blocks to batch and verify whether the unallocated blocks got allocated val batchTime1 = manualClock.getTimeMillis() tracker2.allocateBlocksToBatch(batchTime1) tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 + tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1) // Add more blocks and allocate to another batch incrementTime() @@ -156,7 +169,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state incrementTime() - val tracker3 = createTracker(clock = manualClock) + val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 tracker3.getUnallocatedBlocks(streamId) shouldBe empty @@ -179,14 +192,14 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch incrementTime() - val tracker4 = createTracker(clock = manualClock) + val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } - test("write ahead log disabled when not checkpoint directory is set") { - // When checkpoint is not enabled, then the write ahead log is also disabled + test("disable write ahead log when checkpoint directory is not set") { + // When checkpoint is disabled, then the write ahead log is disabled val tracker1 = createTracker(setCheckpointDir = false) tracker1.isLogManagerEnabled should be (false) } @@ -197,9 +210,11 @@ class ReceivedBlockTrackerSuite */ def createTracker( setCheckpointDir: Boolean = true, + recoverFromWriteAheadLog: Boolean = false, clock: Clock = new SystemClock): ReceivedBlockTracker = { val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None - val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption) + val tracker = new ReceivedBlockTracker( + conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) allReceivedBlockTrackers += tracker tracker } From 5f67a59c3091c774ec3524677f53192a78fbbc20 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 29 Apr 2015 01:30:52 -0700 Subject: [PATCH 3/6] Fixed HdfsUtils to handle append in local file system --- .../main/scala/org/apache/spark/streaming/util/HdfsUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 858ba3c9eb4e5..f60688f173c44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -27,7 +27,7 @@ private[streaming] object HdfsUtils { // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { - if (conf.getBoolean("hdfs.append.support", false)) { + if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) { dfs.append(dfsPath) } else { throw new IllegalStateException("File exists and there is no append support!") From 637bc9c994cd1dbc74f433cb511ba131c85b4ee2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Apr 2015 12:58:03 -0700 Subject: [PATCH 4/6] Changed segment to handle --- .../streaming/dstream/ReceiverInputDStream.scala | 12 ++++++------ .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 13 +++++++------ .../rdd/WriteAheadLogBackedBlockRDDSuite.scala | 2 +- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 059b0cfea06cd..0be8ab8a3887d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -71,15 +71,15 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray - // Is WAL segment info present with all the blocks - val isWALSegmentInfoPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } + // Are WAL record handles present with all the blocks + val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } - if (isWALSegmentInfoPresent) { - // If all the blocks have WAL segment info, then create a WALBackedBlockRDD + if (areWALRecordHandlesPresent) { + // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray - val blockWALSegments = blockInfos.map { _.walRecordHandleOption.get }.toArray + val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( - ssc.sparkContext, blockIds, blockWALSegments, isBlockIdValid) + ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others // then that is unexpected and log a warning accordingly. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 2e19bc3fd5f6f..b9fd37b7c2725 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -31,7 +31,7 @@ import org.apache.spark.streaming.util._ /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. * It contains information about the id of the blocks having this partition's data and - * the segment of the write ahead log that backs the partition. + * the corresponding record handle in the write ahead log that backs the partition. * @param index index of the partition * @param blockId id of the block having the partition data * @param walRecordHandle Handle of the record in a write ahead log having the partition data @@ -49,9 +49,9 @@ class WriteAheadLogBackedBlockRDDPartition( * This class represents a special case of the BlockRDD where the data blocks in * the block manager are also backed by data in write ahead logs. For reading * the data, this RDD first looks up the blocks by their ids in the block manager. - * If it does not find them, it looks up the corresponding file segment. The finding - * of the blocks by their ids can be skipped by setting the corresponding element in - * isBlockIdValid to false. This is a performance optimization which does not affect + * If it does not find them, it looks up the WAL using the corresponding record handle. + * The lookup of the blocks from the block manager can be skipped by setting the corresponding + * element in isBlockIdValid to false. This is a performance optimization which does not affect * correctness, and it can be used in situations where it is known that the block * does not exist in the Spark executors (e.g. after a failed driver is restarted). * @@ -62,7 +62,8 @@ class WriteAheadLogBackedBlockRDDPartition( * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark * executors). If not, then block lookups by the block ids will be skipped. * By default, this is an empty array signifying true for all the blocks. - * @param storeInBlockManager Whether to store in the block manager after reading from the segment + * @param storeInBlockManager Whether to store a block in the block manager + * after reading it from the WAL * @param storageLevel storage level to store when storing in block manager * (applicable when storeInBlockManager = true) */ @@ -79,7 +80,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( require( blockIds.length == walRecordHandles.length, s"Number of block Ids (${blockIds.length}) must be " + - s" same as number of segments (${walRecordHandles.length}})") + s" same as number of WAL record handles (${walRecordHandles.length}})") require( isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 836ea73fe773b..66ce52a6086ec 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -100,7 +100,7 @@ class WriteAheadLogBackedBlockRDDSuite blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) } - // Generate write ahead log file segments + // Generate write ahead log record handles val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++ generateWALRecordHandles(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) From 685fab3e40e0c33d1fe819978cfbbce8086d2be6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 2 May 2015 02:16:05 -0700 Subject: [PATCH 5/6] Addressed comments in PR --- .../main/scala/org/apache/spark/rdd/BlockRDD.scala | 11 ++--------- .../streaming/dstream/ReceiverInputDStream.scala | 9 +++++++-- .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 5 ++++- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 6d52bbd2e6e68..922030263756b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -33,7 +33,6 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) @volatile private var _isValid = true - @volatile private var _setInvalid = true override def getPartitions: Array[Partition] = { assertValid() @@ -67,9 +66,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds blockIds.foreach { blockId => sc.env.blockManager.master.removeBlock(blockId) } - if (_setInvalid) { - _isValid = false - } + _isValid = false } /** @@ -82,16 +79,12 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds /** Check if this BlockRDD is valid. If not valid, exception is thrown. */ private[spark] def assertValid() { - if (!_isValid) { + if (!isValid) { throw new SparkException( "Attempted to use %s after its blocks have been removed!".format(toString)) } } - protected def setInvalidIfBlocksRemoved(setInvalid: Boolean): Unit = { - _setInvalid = setInvalid - } - protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { _locations } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 0be8ab8a3887d..cc36a97329f17 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -25,6 +25,7 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult} import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.streaming.util.WriteAheadLogUtils /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -84,8 +85,12 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others // then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { - logWarning("Could not find Write Ahead Log information on some of the blocks, " + - "data may not be recoverable after driver failures") + if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { + logError("Some blocks do not have Write Ahead Log information; " + + "this is unexpected and data may not be recoverable after driver failures") + } else { + logWarning("Some blocks have Write Ahead Log information; this is unexpected") + } } new BlockRDD[T](ssc.sc, blockIds) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index b9fd37b7c2725..e6b0fbcbffe2b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -34,6 +34,9 @@ import org.apache.spark.streaming.util._ * the corresponding record handle in the write ahead log that backs the partition. * @param index index of the partition * @param blockId id of the block having the partition data + * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark + * executors). If not, then block lookups by the block ids will be skipped. + * By default, this is an empty array signifying true for all the blocks. * @param walRecordHandle Handle of the record in a write ahead log having the partition data */ private[streaming] @@ -91,7 +94,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient private val hadoopConfig = sc.hadoopConfiguration private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig) - setInvalidIfBlocksRemoved(false) + override def isValid(): Boolean = true override def getPartitions: Array[Partition] = { assertValid() From 575476ef71113d3b2448593f48fd844b9e57e1c5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 4 May 2015 19:28:33 -0700 Subject: [PATCH 6/6] Added more tests to get 100% coverage of the WALBackedBlockRDD --- .../rdd/WriteAheadLogBackedBlockRDD.scala | 10 +- .../WriteAheadLogBackedBlockRDDSuite.scala | 103 ++++++++++++++---- 2 files changed, 91 insertions(+), 22 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index e1ad553027bfe..ffce6a4c3c74c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -187,8 +187,14 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( blockLocations.getOrElse { partition.walRecordHandle match { case fileSegment: FileBasedWriteAheadLogSegment => - HdfsUtils.getFileSegmentLocations( - fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig) + try { + HdfsUtils.getFileSegmentLocations( + fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig) + } catch { + case NonFatal(e) => + logError("Error getting WAL file segment locations", e) + Seq.empty + } case _ => Seq.empty } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 66ce52a6086ec..6859b65c7165f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, SparkException} class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { @@ -60,24 +60,35 @@ class WriteAheadLogBackedBlockRDDSuite System.clearProperty("spark.driver.port") } - test("Read data available in block manager and write ahead log") { - testRDD(5, 5) + test("Read data available in both block manager and write ahead log") { + testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5) } test("Read data available only in block manager, not in write ahead log") { - testRDD(5, 0) + testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0) } test("Read data available only in write ahead log, not in block manager") { - testRDD(0, 5) + testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5) } - test("Read data available only in write ahead log, and test storing in block manager") { - testRDD(0, 5, testStoreInBM = true) + test("Read data with partially available in block manager, and rest in write ahead log") { + testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2) } - test("Read data with partially available in block manager, and rest in write ahead log") { - testRDD(3, 2) + test("Test isBlockValid skips block fetching from BlockManager") { + testRDD( + numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true) + } + + test("Test whether RDD is valid after removing blocks from block manager") { + testRDD( + numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true) + } + + test("Test storing of blocks recovered from write ahead log back into block manager") { + testRDD( + numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true) } /** @@ -85,23 +96,52 @@ class WriteAheadLogBackedBlockRDDSuite * and the rest to a write ahead log, and then reading reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. - * @param numPartitionsInBM Number of partitions to write to the Block Manager - * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log - * @param testStoreInBM Test whether blocks read from log are stored back into block manager + * + * + * + * @param numPartitions Number of partitions in RDD + * @param numPartitionsInBM Number of partitions to write to the BlockManager. + * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager + * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log. + * Partitions (numPartitions - 1 - numPartitionsInWAL) to + * (numPartitions - 1) will be written to WAL + * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching + * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with + * reads falling back to the WAL + * @param testStoreInBM Test whether blocks read from log are stored back into block manager + * + * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 + * + * numPartitionsInBM = 3 + * |------------------| + * | | + * 0 1 2 3 4 + * | | + * |-------------------------| + * numPartitionsInWAL = 4 */ private def testRDD( - numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) { - val numBlocks = numPartitionsInBM + numPartitionsInWAL - val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50)) + numPartitions: Int, + numPartitionsInBM: Int, + numPartitionsInWAL: Int, + testIsBlockValid: Boolean = false, + testBlockRemove: Boolean = false, + testStoreInBM: Boolean = false + ) { + require(numPartitionsInBM <= numPartitions, + "Can't put more partitions in BlockManager than that in RDD") + require(numPartitionsInWAL <= numPartitions, + "Can't put more partitions in write ahead log than that in RDD") + val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50)) // Put the necessary blocks in the block manager - val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt())) + val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt())) data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) => blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) } // Generate write ahead log record handles - val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++ + val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++ generateWALRecordHandles(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) @@ -111,7 +151,7 @@ class WriteAheadLogBackedBlockRDDSuite "Expected blocks not in BlockManager" ) require( - blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty), + blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty), "Unexpected blocks in BlockManager" ) @@ -122,16 +162,39 @@ class WriteAheadLogBackedBlockRDDSuite "Expected blocks not in write ahead log" ) require( - recordHandles.take(numPartitionsInBM).forall(s => + recordHandles.take(numPartitions - numPartitionsInWAL).forall(s => !new File(s.path.stripPrefix("file://")).exists()), "Unexpected blocks in write ahead log" ) // Create the RDD and verify whether the returned data is correct val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, - recordHandles.toArray, storeInBlockManager = false, storageLevel = StorageLevel.MEMORY_ONLY) + recordHandles.toArray, storeInBlockManager = false) assert(rdd.collect() === data.flatten) + // Verify that the block fetching is skipped when isBlockValid is set to false. + // This is done by using a RDD whose data is only in memory but is set to skip block fetching + // Using that RDD will throw exception, as it skips block fetching even if the blocks are in + // in BlockManager. + if (testIsBlockValid) { + require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") + require(numPartitionsInWAL === 0, "No partitions must be in WAL") + val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, + recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false)) + intercept[SparkException] { + rdd2.collect() + } + } + + // Verify that the RDD is not invalid after the blocks are removed and can still read data + // from write ahead log + if (testBlockRemove) { + require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test") + require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") + rdd.removeBlocks() + assert(rdd.collect() === data.flatten) + } + if (testStoreInBM) { val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY)