diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 15d9710d37cd..79b3fd1107d7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.scheduler.InputInfo import org.apache.spark.streaming.util.WriteAheadLogUtils /** @@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray + // Register the input blokcs information into InputInfoTracker + val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }