Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.streaming.dstream

import scala.collection.mutable.HashMap
import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.SparkException

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -40,9 +39,6 @@ import org.apache.spark.SparkException
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

/** Keeps all received blocks information */
private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

/** This is an unique identifier for the network input stream. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the functionality to keep track of block-to-batch allocations have been moved from ReceiverInputDStream to ReceivedBlockTracker, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.

val id = ssc.getNewReceiverStreamId()

Expand All @@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

def stop() {}

/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
/**
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array.empty))
}
}
val blockRDD = {

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val blockInfos =
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the receiver tracker read from HDFS each time getBlocksOfBatch is called (sorry, I don't remember if it does)? If it does, then this call incurs more HDFS reads than required when there are several streams in the same app, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this. Verified it does not.

val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the blockId here is a StreamBlockId, which is a subclass of BlockId, so I don't think that you need this cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need because Array[StreamBlockIds] cannot be assigned to a variable of type Array[BlockIds]


// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
logWarning("Multiple result types in block information, WAL information will be ignored.")
}

// If all the results are of type WriteAheadLogBasedStoreResult, then create
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
blockIds, logSegments, storeInBlockManager = true, StorageLevel.MEMORY_ONLY_SER)
} else {
new BlockRDD[T](ssc.sc, blockIds)
}
}
}
Some(blockRDD)
}

/**
Expand All @@ -86,10 +103,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
receivedBlockInfo --= oldReceivedBlocks.keys
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class WriteAheadLogBackedBlockRDDPartition(
* If it does not find them, it looks up the corresponding file segment.
*
* @param sc SparkContext
* @param hadoopConfig Hadoop configuration
* @param blockIds Ids of the blocks that contains this RDD's data
* @param segments Segments in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
Expand All @@ -58,7 +57,6 @@ class WriteAheadLogBackedBlockRDDPartition(
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient hadoopConfig: Configuration,
@transient blockIds: Array[BlockId],
@transient segments: Array[WriteAheadLogFileSegment],
storeInBlockManager: Boolean,
Expand All @@ -71,6 +69,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
s"the same as number of segments (${segments.length}})!")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)

override def getPartitions: Array[Partition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Wait until all the received blocks in the network input tracker has
// been consumed by network input DStreams, and jobs have been generated with them
logInfo("Waiting for all received blocks to be consumed for job generation")
while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) {
while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) {
Thread.sleep(pollTime)
}
logInfo("Waited for all received blocks to be consumed for job generation")
Expand Down Expand Up @@ -217,14 +217,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
Try(graph.generateJobs(time)) match {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering - why does this need to be set here? Who consumes this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added when SparkEnv needed to be set for launching jobs on non-main threads. Since the JobGenerator is background thread which actually submits the jobs, the SparkEnv needed to be set. But since we have removed the whole threadlocal stuff from SparkEnv, this is probably not needed any more. we can either removed this (scary), or document this as potentially removable.

Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
val receivedBlockInfos =
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
Expand All @@ -234,6 +238,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration)

// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
Expand Down
Loading