1717
1818package org .apache .spark .streaming .dstream
1919
20- import java .util .concurrent .ArrayBlockingQueue
2120import java .nio .ByteBuffer
21+ import java .util .concurrent .ArrayBlockingQueue
2222
23- import scala .collection .mutable .ArrayBuffer
23+ import scala .collection .mutable .{ ArrayBuffer , HashMap }
2424import scala .concurrent .Await
2525import scala .concurrent .duration ._
2626import scala .reflect .ClassTag
2727
28- import akka .actor .{Props , Actor }
28+ import akka .actor .{Actor , Props }
2929import akka .pattern .ask
3030
31- import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
32- import org .apache .spark .streaming ._
3331import org .apache .spark .{Logging , SparkEnv }
34- import org .apache .spark .rdd .{RDD , BlockRDD }
32+ import org .apache .spark .rdd .{BlockRDD , RDD }
3533import org .apache .spark .storage .{BlockId , StorageLevel , StreamBlockId }
36- import org .apache .spark .streaming .scheduler .{DeregisterReceiver , AddBlocks , RegisterReceiver }
34+ import org .apache .spark .streaming ._
35+ import org .apache .spark .streaming .scheduler .{ReceivedBlockInfo , AddBlocks , DeregisterReceiver , RegisterReceiver }
36+ import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
37+ import org .apache .spark .util .Utils
3738
3839/**
3940 * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream ]]
@@ -48,8 +49,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
4849abstract class NetworkInputDStream [T : ClassTag ](@ transient ssc_ : StreamingContext )
4950 extends InputDStream [T ](ssc_) {
5051
51- // This is an unique identifier that is used to match the network receiver with the
52- // corresponding network input stream.
52+ /** Keeps all received blocks information */
53+ private val receivedBlockInfo = new HashMap [Time , Array [ReceivedBlockInfo ]]
54+
55+ /** This is an unique identifier for the network input stream. */
5356 val id = ssc.getNewNetworkStreamId()
5457
5558 /**
@@ -64,23 +67,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
6467
6568 def stop () {}
6669
70+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
6771 override def compute (validTime : Time ): Option [RDD [T ]] = {
6872 // If this is called for any time before the start time of the context,
6973 // then this returns an empty RDD. This may happen when recovering from a
7074 // master failure
7175 if (validTime >= graph.startTime) {
72- val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
76+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
77+ receivedBlockInfo(validTime) = blockInfo
78+ val blockIds = blockInfo.map(_.blockId.asInstanceOf [BlockId ])
7379 Some (new BlockRDD [T ](ssc.sc, blockIds))
7480 } else {
7581 Some (new BlockRDD [T ](ssc.sc, Array [BlockId ]()))
7682 }
7783 }
84+
85+ /** Get information on received blocks. */
86+ private [streaming] def getReceivedBlockInfo (time : Time ) = {
87+ receivedBlockInfo(time)
88+ }
89+
90+ /**
91+ * Clear metadata that are older than `rememberDuration` of this DStream.
92+ * This is an internal method that should not be called directly. This
93+ * implementation overrides the default implementation to clear received
94+ * block information.
95+ */
96+ private [streaming] override def clearMetadata (time : Time ) {
97+ super .clearMetadata(time)
98+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
99+ receivedBlockInfo --= oldReceivedBlocks.keys
100+ logDebug(" Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
101+ (time - rememberDuration) + " : " + oldReceivedBlocks.keys.mkString(" , " ))
102+ }
78103}
79104
80105
81106private [streaming] sealed trait NetworkReceiverMessage
82107private [streaming] case class StopReceiver (msg : String ) extends NetworkReceiverMessage
83- private [streaming] case class ReportBlock (blockId : BlockId , metadata : Any )
108+ private [streaming] case class ReportBlock (blockId : StreamBlockId , numRecords : Long , metadata : Any )
84109 extends NetworkReceiverMessage
85110private [streaming] case class ReportError (msg : String ) extends NetworkReceiverMessage
86111
@@ -155,21 +180,30 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
155180 actor ! ReportError (e.toString)
156181 }
157182
158-
159183 /**
160184 * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
161185 */
162- def pushBlock (blockId : BlockId , arrayBuffer : ArrayBuffer [T ], metadata : Any , level : StorageLevel ) {
186+ def pushBlock (
187+ blockId : StreamBlockId ,
188+ arrayBuffer : ArrayBuffer [T ],
189+ metadata : Any ,
190+ level : StorageLevel
191+ ) {
163192 env.blockManager.put(blockId, arrayBuffer.asInstanceOf [ArrayBuffer [Any ]], level)
164- actor ! ReportBlock (blockId, metadata)
193+ actor ! ReportBlock (blockId, arrayBuffer.size, metadata)
165194 }
166195
167196 /**
168197 * Pushes a block (as bytes) into the block manager.
169198 */
170- def pushBlock (blockId : BlockId , bytes : ByteBuffer , metadata : Any , level : StorageLevel ) {
199+ def pushBlock (
200+ blockId : StreamBlockId ,
201+ bytes : ByteBuffer ,
202+ metadata : Any ,
203+ level : StorageLevel
204+ ) {
171205 env.blockManager.putBytes(blockId, bytes, level)
172- actor ! ReportBlock (blockId, metadata)
206+ actor ! ReportBlock (blockId, - 1 , metadata)
173207 }
174208
175209 /** A helper actor that communicates with the NetworkInputTracker */
@@ -182,13 +216,15 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
182216 val timeout = 5 .seconds
183217
184218 override def preStart () {
185- val future = tracker.ask(RegisterReceiver (streamId, self))(timeout)
219+ val msg = RegisterReceiver (
220+ streamId, NetworkReceiver .this .getClass.getSimpleName, Utils .localHostName(), self)
221+ val future = tracker.ask(msg)(timeout)
186222 Await .result(future, timeout)
187223 }
188224
189225 override def receive () = {
190- case ReportBlock (blockId, metadata) =>
191- tracker ! AddBlocks (streamId, Array ( blockId), metadata)
226+ case ReportBlock (blockId, numRecords, metadata) =>
227+ tracker ! AddBlocks (ReceivedBlockInfo ( streamId, blockId, numRecords, metadata) )
192228 case ReportError (msg) =>
193229 tracker ! DeregisterReceiver (streamId, msg)
194230 case StopReceiver (msg) =>
@@ -210,7 +246,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
210246 class BlockGenerator (storageLevel : StorageLevel )
211247 extends Serializable with Logging {
212248
213- case class Block (id : BlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
249+ case class Block (id : StreamBlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
214250
215251 val clock = new SystemClock ()
216252 val blockInterval = env.conf.getLong(" spark.streaming.blockInterval" , 200 )
0 commit comments