Skip to content

Commit f537632

Browse files
zsxwingtdas
authored andcommitted
[SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data
## What changes were proposed in this pull request? This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <[email protected]> Closes #16108 from zsxwing/SPARK-18670. (cherry picked from commit 56a503d) Signed-off-by: Tathagata Das <[email protected]>
1 parent f915f81 commit f537632

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class StreamExecution(
6363

6464
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
6565

66+
private val noDataProgressEventInterval =
67+
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
68+
6669
/**
6770
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
6871
*/
@@ -196,6 +199,9 @@ class StreamExecution(
196199
// While active, repeatedly attempt to run batches.
197200
SparkSession.setActiveSession(sparkSession)
198201

202+
// The timestamp we report an event that has no input data
203+
var lastNoDataProgressEventTime = Long.MinValue
204+
199205
triggerExecutor.execute(() => {
200206
startTrigger()
201207

@@ -218,7 +224,17 @@ class StreamExecution(
218224

219225
// Report trigger as finished and construct progress object.
220226
finishTrigger(dataAvailable)
221-
postEvent(new QueryProgressEvent(lastProgress))
227+
if (dataAvailable) {
228+
// Reset noDataEventTimestamp if we processed any data
229+
lastNoDataProgressEventTime = Long.MinValue
230+
postEvent(new QueryProgressEvent(lastProgress))
231+
} else {
232+
val now = triggerClock.getTimeMillis()
233+
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
234+
lastNoDataProgressEventTime = now
235+
postEvent(new QueryProgressEvent(lastProgress))
236+
}
237+
}
222238

223239
if (dataAvailable) {
224240
// We'll increase currentBatchId after we complete processing current batch's data

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,13 @@ object SQLConf {
577577
.timeConf(TimeUnit.MILLISECONDS)
578578
.createWithDefault(10L)
579579

580+
val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
581+
SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
582+
.internal()
583+
.doc("How long to wait between two progress events when there is no data")
584+
.timeConf(TimeUnit.MILLISECONDS)
585+
.createWithDefault(10000L)
586+
580587
val STREAMING_METRICS_ENABLED =
581588
SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
582589
.doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
@@ -658,6 +665,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
658665

659666
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
660667

668+
def streamingNoDataProgressEventInterval: Long =
669+
getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
670+
661671
def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
662672

663673
def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._
3131
import org.apache.spark.SparkException
3232
import org.apache.spark.scheduler._
3333
import org.apache.spark.sql.execution.streaming._
34+
import org.apache.spark.sql.internal.SQLConf
3435
import org.apache.spark.sql.streaming.StreamingQueryListener._
3536
import org.apache.spark.util.JsonProtocol
3637

@@ -46,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
4647
assert(spark.streams.active.isEmpty)
4748
assert(addedListeners.isEmpty)
4849
// Make sure we don't leak any events to the next test
50+
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
4951
}
5052

5153
testQuietly("single listener, check trigger events are generated correctly") {
@@ -191,6 +193,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
191193
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
192194
}
193195

196+
test("only one progress event per interval when no data") {
197+
// This test will start a query but not push any data, and then check if we push too many events
198+
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") {
199+
@volatile var numProgressEvent = 0
200+
val listener = new StreamingQueryListener {
201+
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
202+
override def onQueryProgress(event: QueryProgressEvent): Unit = {
203+
numProgressEvent += 1
204+
}
205+
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
206+
}
207+
spark.streams.addListener(listener)
208+
try {
209+
val input = new MemoryStream[Int](0, sqlContext) {
210+
@volatile var numTriggers = 0
211+
override def getOffset: Option[Offset] = {
212+
numTriggers += 1
213+
super.getOffset
214+
}
215+
}
216+
val clock = new StreamManualClock()
217+
val actions = mutable.ArrayBuffer[StreamAction]()
218+
actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
219+
for (_ <- 1 to 100) {
220+
actions += AdvanceManualClock(10)
221+
}
222+
actions += AssertOnQuery { _ =>
223+
eventually(timeout(streamingTimeout)) {
224+
assert(input.numTriggers > 100) // at least 100 triggers have occurred
225+
}
226+
true
227+
}
228+
testStream(input.toDS)(actions: _*)
229+
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
230+
// 11 is the max value of the possible numbers of events.
231+
assert(numProgressEvent > 1 && numProgressEvent <= 11)
232+
} finally {
233+
spark.streams.removeListener(listener)
234+
}
235+
}
236+
}
237+
194238
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
195239
// query-event-logs-version-2.0.0.txt has all types of events generated by
196240
// Structured Streaming in Spark 2.0.0.

0 commit comments

Comments
 (0)