@@ -105,8 +105,13 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
105105 val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
106106 val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
107107 (0 until numNetworkReceivers).map { receiverId =>
108- val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array .empty))
109- val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
108+ val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
109+ batchInfo.get(receiverId).getOrElse(Array .empty)
110+ }
111+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
112+ // calculate records per second for each batch
113+ blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
114+ }
110115 val distributionOption = Distribution (recordsOfParticularReceiver)
111116 (receiverId, distributionOption)
112117 }.toMap
@@ -231,16 +236,24 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
231236 val numBatches = listener.completedBatches.size
232237 val lastCompletedBatch = listener.lastCompletedBatch
233238 val table = if (numBatches > 0 ) {
234- val processingDelayQuantilesRow =
235- Seq (" Processing Time" , msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++
236- getQuantiles(listener.processingDelayDistribution)
237- val schedulingDelayQuantilesRow =
238- Seq (" Scheduling Delay" , msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++
239- getQuantiles(listener.schedulingDelayDistribution)
240- val totalDelayQuantilesRow =
241- Seq (" Total Delay" , msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++
242- getQuantiles(listener.totalDelayDistribution)
243-
239+ val processingDelayQuantilesRow = {
240+ Seq (
241+ " Processing Time" ,
242+ msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))
243+ ) ++ getQuantiles(listener.processingDelayDistribution)
244+ }
245+ val schedulingDelayQuantilesRow = {
246+ Seq (
247+ " Scheduling Delay" ,
248+ msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))
249+ ) ++ getQuantiles(listener.schedulingDelayDistribution)
250+ }
251+ val totalDelayQuantilesRow = {
252+ Seq (
253+ " Total Delay" ,
254+ msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))
255+ ) ++ getQuantiles(listener.totalDelayDistribution)
256+ }
244257 val headerRow = Seq (" Metric" , " Last batch" , " Minimum" , " 25th percentile" ,
245258 " Median" , " 75th percentile" , " Maximum" )
246259 val dataRows : Seq [Seq [String ]] = Seq (
0 commit comments