-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-4029][Streaming] Update streaming driver to reliably save and recover received block metadata on driver failures #3026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7ae0a7f
25611d6
cda62ee
f66d277
19aec7d
59496d3
fce2b21
af63655
9a7e3e4
47fc1e3
2ee2484
1d704bb
a8009ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,15 +17,14 @@ | |
|
|
||
| package org.apache.spark.streaming.dstream | ||
|
|
||
| import scala.collection.mutable.HashMap | ||
| import scala.reflect.ClassTag | ||
|
|
||
| import org.apache.spark.rdd.{BlockRDD, RDD} | ||
| import org.apache.spark.storage.BlockId | ||
| import org.apache.spark.storage.{BlockId, StorageLevel} | ||
| import org.apache.spark.streaming._ | ||
| import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver} | ||
| import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD | ||
| import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult} | ||
| import org.apache.spark.streaming.scheduler.ReceivedBlockInfo | ||
| import org.apache.spark.SparkException | ||
|
|
||
| /** | ||
| * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] | ||
|
|
@@ -40,9 +39,6 @@ import org.apache.spark.SparkException | |
| abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) | ||
| extends InputDStream[T](ssc_) { | ||
|
|
||
| /** Keeps all received blocks information */ | ||
| private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] | ||
|
|
||
| /** This is an unique identifier for the network input stream. */ | ||
| val id = ssc.getNewReceiverStreamId() | ||
|
|
||
|
|
@@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont | |
|
|
||
| def stop() {} | ||
|
|
||
| /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ | ||
| /** | ||
| * Generates RDDs with blocks received by the receiver of this stream. */ | ||
| override def compute(validTime: Time): Option[RDD[T]] = { | ||
| // If this is called for any time before the start time of the context, | ||
| // then this returns an empty RDD. This may happen when recovering from a | ||
| // master failure | ||
| if (validTime >= graph.startTime) { | ||
| val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) | ||
| receivedBlockInfo(validTime) = blockInfo | ||
| val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] } | ||
| Some(new BlockRDD[T](ssc.sc, blockIds)) | ||
| } else { | ||
| Some(new BlockRDD[T](ssc.sc, Array.empty)) | ||
| } | ||
| } | ||
| val blockRDD = { | ||
|
|
||
| /** Get information on received blocks. */ | ||
| private[streaming] def getReceivedBlockInfo(time: Time) = { | ||
| receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo]) | ||
| if (validTime < graph.startTime) { | ||
| // If this is called for any time before the start time of the context, | ||
| // then this returns an empty RDD. This may happen when recovering from a | ||
| // driver failure without any write ahead log to recover pre-failure data. | ||
| new BlockRDD[T](ssc.sc, Array.empty) | ||
| } else { | ||
| // Otherwise, ask the tracker for all the blocks that have been allocated to this stream | ||
| // for this batch | ||
| val blockInfos = | ||
| ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the receiver tracker read from HDFS each time getBlocksOfBatch is called (sorry, I don't remember if it does)? If it does, then this call incurs more HDFS reads than required when there are several streams in the same app, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore this. Verified it does not. |
||
| val blockStoreResults = blockInfos.map { _.blockStoreResult } | ||
| val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need because |
||
|
|
||
| // Check whether all the results are of the same type | ||
| val resultTypes = blockStoreResults.map { _.getClass }.distinct | ||
| if (resultTypes.size > 1) { | ||
| logWarning("Multiple result types in block information, WAL information will be ignored.") | ||
| } | ||
|
|
||
| // If all the results are of type WriteAheadLogBasedStoreResult, then create | ||
| // WriteAheadLogBackedBlockRDD else create simple BlockRDD. | ||
| if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) { | ||
| val logSegments = blockStoreResults.map { | ||
| _.asInstanceOf[WriteAheadLogBasedStoreResult].segment | ||
| }.toArray | ||
| // Since storeInBlockManager = false, the storage level does not matter. | ||
| new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, | ||
| blockIds, logSegments, storeInBlockManager = true, StorageLevel.MEMORY_ONLY_SER) | ||
| } else { | ||
| new BlockRDD[T](ssc.sc, blockIds) | ||
| } | ||
| } | ||
| } | ||
| Some(blockRDD) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -86,10 +103,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont | |
| */ | ||
| private[streaming] override def clearMetadata(time: Time) { | ||
| super.clearMetadata(time) | ||
| val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) | ||
| receivedBlockInfo --= oldReceivedBlocks.keys | ||
| logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + | ||
| (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) | ||
| ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,7 +112,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { | |
| // Wait until all the received blocks in the network input tracker has | ||
| // been consumed by network input DStreams, and jobs have been generated with them | ||
| logInfo("Waiting for all received blocks to be consumed for job generation") | ||
| while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) { | ||
| while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { | ||
| Thread.sleep(pollTime) | ||
| } | ||
| logInfo("Waited for all received blocks to be consumed for job generation") | ||
|
|
@@ -217,14 +217,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { | |
|
|
||
| /** Generate jobs and perform checkpoint for the given `time`. */ | ||
| private def generateJobs(time: Time) { | ||
| Try(graph.generateJobs(time)) match { | ||
| // Set the SparkEnv in this thread, so that job generation code can access the environment | ||
| // Example: BlockRDDs are created in this thread, and it needs to access BlockManager | ||
| // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. | ||
| SparkEnv.set(ssc.env) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering - why does this need to be set here? Who consumes this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was added when SparkEnv needed to be set for launching jobs on non-main threads. Since the JobGenerator is background thread which actually submits the jobs, the SparkEnv needed to be set. But since we have removed the whole threadlocal stuff from SparkEnv, this is probably not needed any more. we can either removed this (scary), or document this as potentially removable. |
||
| Try { | ||
| jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch | ||
| graph.generateJobs(time) // generate jobs using allocated block | ||
| } match { | ||
| case Success(jobs) => | ||
| val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => | ||
| val streamId = stream.id | ||
| val receivedBlockInfo = stream.getReceivedBlockInfo(time) | ||
| (streamId, receivedBlockInfo) | ||
| }.toMap | ||
| jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) | ||
| val receivedBlockInfos = | ||
| jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray } | ||
| jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos)) | ||
| case Failure(e) => | ||
| jobScheduler.reportError("Error generating jobs for time " + time, e) | ||
| } | ||
|
|
@@ -234,6 +238,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { | |
| /** Clear DStream metadata for the given `time`. */ | ||
| private def clearMetadata(time: Time) { | ||
| ssc.graph.clearMetadata(time) | ||
| jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration) | ||
|
|
||
| // If checkpointing is enabled, then checkpoint, | ||
| // else mark batch to be fully processed | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the functionality to keep track of block-to-batch allocations have been moved from
ReceiverInputDStreamtoReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.