Skip to content

Commit 5f13759

Browse files
committed
[SPARK-4029][Streaming] Update streaming driver to reliably save and recover received block metadata on driver failures
As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart. This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks. Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`. This is still a WIP. Things that are missing here are. - *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): tdas#25 I can either include it in this PR, or submit that as a separate PR after this gets in. - *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on. Author: Tathagata Das <[email protected]> Closes #3026 from tdas/driver-ha-rbt and squashes the following commits: a8009ed [Tathagata Das] Added comment 1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM 2ee2484 [Tathagata Das] More minor changes based on PR 47fc1e3 [Tathagata Das] Addressed PR comments. 9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker. af63655 [Tathagata Das] Minor changes. fce2b21 [Tathagata Das] Removed commented lines 59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup 19aec7d [Tathagata Das] Fixed casting bug. f66d277 [Tathagata Das] Fix line lengths. cda62ee [Tathagata Das] Added license 25611d6 [Tathagata Das] Minor changes before submitting PR 7ae0a7f [Tathagata Das] Transferred changes from driver-ha-working branch
1 parent c8abddc commit 5f13759

File tree

8 files changed

+597
-89
lines changed

8 files changed

+597
-89
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import scala.collection.mutable.HashMap
2120
import scala.reflect.ClassTag
2221

2322
import org.apache.spark.rdd.{BlockRDD, RDD}
24-
import org.apache.spark.storage.BlockId
23+
import org.apache.spark.storage.{BlockId, StorageLevel}
2524
import org.apache.spark.streaming._
26-
import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
25+
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
26+
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
2727
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
28-
import org.apache.spark.SparkException
2928

3029
/**
3130
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -40,9 +39,6 @@ import org.apache.spark.SparkException
4039
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
4140
extends InputDStream[T](ssc_) {
4241

43-
/** Keeps all received blocks information */
44-
private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
45-
4642
/** This is an unique identifier for the network input stream. */
4743
val id = ssc.getNewReceiverStreamId()
4844

@@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
5854

5955
def stop() {}
6056

61-
/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
57+
/**
58+
* Generates RDDs with blocks received by the receiver of this stream. */
6259
override def compute(validTime: Time): Option[RDD[T]] = {
63-
// If this is called for any time before the start time of the context,
64-
// then this returns an empty RDD. This may happen when recovering from a
65-
// master failure
66-
if (validTime >= graph.startTime) {
67-
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
68-
receivedBlockInfo(validTime) = blockInfo
69-
val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
70-
Some(new BlockRDD[T](ssc.sc, blockIds))
71-
} else {
72-
Some(new BlockRDD[T](ssc.sc, Array.empty))
73-
}
74-
}
60+
val blockRDD = {
7561

76-
/** Get information on received blocks. */
77-
private[streaming] def getReceivedBlockInfo(time: Time) = {
78-
receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
62+
if (validTime < graph.startTime) {
63+
// If this is called for any time before the start time of the context,
64+
// then this returns an empty RDD. This may happen when recovering from a
65+
// driver failure without any write ahead log to recover pre-failure data.
66+
new BlockRDD[T](ssc.sc, Array.empty)
67+
} else {
68+
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
69+
// for this batch
70+
val blockInfos =
71+
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
72+
val blockStoreResults = blockInfos.map { _.blockStoreResult }
73+
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
74+
75+
// Check whether all the results are of the same type
76+
val resultTypes = blockStoreResults.map { _.getClass }.distinct
77+
if (resultTypes.size > 1) {
78+
logWarning("Multiple result types in block information, WAL information will be ignored.")
79+
}
80+
81+
// If all the results are of type WriteAheadLogBasedStoreResult, then create
82+
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
83+
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
84+
val logSegments = blockStoreResults.map {
85+
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
86+
}.toArray
87+
// Since storeInBlockManager = false, the storage level does not matter.
88+
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
89+
blockIds, logSegments, storeInBlockManager = true, StorageLevel.MEMORY_ONLY_SER)
90+
} else {
91+
new BlockRDD[T](ssc.sc, blockIds)
92+
}
93+
}
94+
}
95+
Some(blockRDD)
7996
}
8097

8198
/**
@@ -86,10 +103,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
86103
*/
87104
private[streaming] override def clearMetadata(time: Time) {
88105
super.clearMetadata(time)
89-
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
90-
receivedBlockInfo --= oldReceivedBlocks.keys
91-
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
92-
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
106+
ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration)
93107
}
94108
}
95-

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class WriteAheadLogBackedBlockRDDPartition(
4848
* If it does not find them, it looks up the corresponding file segment.
4949
*
5050
* @param sc SparkContext
51-
* @param hadoopConfig Hadoop configuration
5251
* @param blockIds Ids of the blocks that contains this RDD's data
5352
* @param segments Segments in write ahead logs that contain this RDD's data
5453
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
@@ -58,7 +57,6 @@ class WriteAheadLogBackedBlockRDDPartition(
5857
private[streaming]
5958
class WriteAheadLogBackedBlockRDD[T: ClassTag](
6059
@transient sc: SparkContext,
61-
@transient hadoopConfig: Configuration,
6260
@transient blockIds: Array[BlockId],
6361
@transient segments: Array[WriteAheadLogFileSegment],
6462
storeInBlockManager: Boolean,
@@ -71,6 +69,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
7169
s"the same as number of segments (${segments.length}})!")
7270

7371
// Hadoop configuration is not serializable, so broadcast it as a serializable.
72+
@transient private val hadoopConfig = sc.hadoopConfiguration
7473
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
7574

7675
override def getPartitions: Array[Partition] = {

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
112112
// Wait until all the received blocks in the network input tracker has
113113
// been consumed by network input DStreams, and jobs have been generated with them
114114
logInfo("Waiting for all received blocks to be consumed for job generation")
115-
while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) {
115+
while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) {
116116
Thread.sleep(pollTime)
117117
}
118118
logInfo("Waited for all received blocks to be consumed for job generation")
@@ -217,14 +217,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
217217

218218
/** Generate jobs and perform checkpoint for the given `time`. */
219219
private def generateJobs(time: Time) {
220-
Try(graph.generateJobs(time)) match {
220+
// Set the SparkEnv in this thread, so that job generation code can access the environment
221+
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
222+
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
223+
SparkEnv.set(ssc.env)
224+
Try {
225+
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
226+
graph.generateJobs(time) // generate jobs using allocated block
227+
} match {
221228
case Success(jobs) =>
222-
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
223-
val streamId = stream.id
224-
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
225-
(streamId, receivedBlockInfo)
226-
}.toMap
227-
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
229+
val receivedBlockInfos =
230+
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
231+
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
228232
case Failure(e) =>
229233
jobScheduler.reportError("Error generating jobs for time " + time, e)
230234
}
@@ -234,6 +238,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
234238
/** Clear DStream metadata for the given `time`. */
235239
private def clearMetadata(time: Time) {
236240
ssc.graph.clearMetadata(time)
241+
jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration)
237242

238243
// If checkpointing is enabled, then checkpoint,
239244
// else mark batch to be fully processed

0 commit comments

Comments
 (0)