Skip to content

Commit f205fe4

Browse files
jerryshaotdas
authored andcommitted
[SPARK-4537][Streaming] Expand StreamingSource to add more metrics
Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao <[email protected]> Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics
1 parent ac82785 commit f205fe4

File tree

2 files changed

+57
-15
lines changed

2 files changed

+57
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
2929
private val streamingListener = ssc.progressListener
3030

3131
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
32-
defaultValue: T) {
32+
defaultValue: T): Unit = {
33+
registerGaugeWithOption[T](name,
34+
(l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue)
35+
}
36+
37+
private def registerGaugeWithOption[T](
38+
name: String,
39+
f: StreamingJobProgressListener => Option[T],
40+
defaultValue: T): Unit = {
3341
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
34-
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
42+
override def getValue: T = f(streamingListener).getOrElse(defaultValue)
3543
})
3644
}
3745

@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
4149
// Gauge for number of total completed batches
4250
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
4351

52+
// Gauge for number of total received records
53+
registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
54+
55+
// Gauge for number of total processed records
56+
registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
57+
4458
// Gauge for number of unprocessed batches
4559
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
4660

@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
5569

5670
// Gauge for last completed batch, useful for monitoring the streaming job's running status,
5771
// displayed data -1 for any abnormal condition.
58-
registerGauge("lastCompletedBatch_submissionTime",
59-
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
60-
registerGauge("lastCompletedBatch_processStartTime",
61-
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
62-
registerGauge("lastCompletedBatch_processEndTime",
63-
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
72+
registerGaugeWithOption("lastCompletedBatch_submissionTime",
73+
_.lastCompletedBatch.map(_.submissionTime), -1L)
74+
registerGaugeWithOption("lastCompletedBatch_processingStartTime",
75+
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
76+
registerGaugeWithOption("lastCompletedBatch_processingEndTime",
77+
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
78+
79+
// Gauge for last completed batch's delay information.
80+
registerGaugeWithOption("lastCompletedBatch_processingDelay",
81+
_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
82+
registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
83+
_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
84+
registerGaugeWithOption("lastCompletedBatch_totalDelay",
85+
_.lastCompletedBatch.flatMap(_.totalDelay), -1L)
6486

6587
// Gauge for last received batch, useful for monitoring the streaming job's running status,
6688
// displayed data -1 for any abnormal condition.
67-
registerGauge("lastReceivedBatch_submissionTime",
68-
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
69-
registerGauge("lastReceivedBatch_processStartTime",
70-
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
71-
registerGauge("lastReceivedBatch_processEndTime",
72-
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
89+
registerGaugeWithOption("lastReceivedBatch_submissionTime",
90+
_.lastCompletedBatch.map(_.submissionTime), -1L)
91+
registerGaugeWithOption("lastReceivedBatch_processingStartTime",
92+
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
93+
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
94+
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
95+
96+
// Gauge for last received batch records.
97+
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
7398
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
2525
import org.apache.spark.streaming.scheduler.BatchInfo
2626
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
2727
import org.apache.spark.util.Distribution
28-
import org.apache.spark.Logging
2928

3029

3130
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -36,6 +35,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3635
private val completedaBatchInfos = new Queue[BatchInfo]
3736
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
3837
private var totalCompletedBatches = 0L
38+
private var totalReceivedRecords = 0L
39+
private var totalProcessedRecords = 0L
3940
private val receiverInfos = new HashMap[Int, ReceiverInfo]
4041

4142
val batchDuration = ssc.graph.batchDuration.milliseconds
@@ -65,6 +66,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
6566
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
6667
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
6768
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
69+
70+
batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
71+
totalReceivedRecords += infos.map(_.numRecords).sum
72+
}
6873
}
6974

7075
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
@@ -73,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
7378
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
7479
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
7580
totalCompletedBatches += 1L
81+
82+
batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
83+
totalProcessedRecords += infos.map(_.numRecords).sum
84+
}
7685
}
7786

7887
def numReceivers = synchronized {
@@ -83,6 +92,14 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
8392
totalCompletedBatches
8493
}
8594

95+
def numTotalReceivedRecords: Long = synchronized {
96+
totalReceivedRecords
97+
}
98+
99+
def numTotalProcessedRecords: Long = synchronized {
100+
totalProcessedRecords
101+
}
102+
86103
def numUnprocessedBatches: Long = synchronized {
87104
waitingBatchInfos.size + runningBatchInfos.size
88105
}

0 commit comments

Comments
 (0)