Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 56a503d

Browse files
zsxwingtdas
authored andcommitted
[SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data
## What changes were proposed in this pull request? This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <[email protected]> Closes apache#16108 from zsxwing/SPARK-18670.
1 parent a985dd8 commit 56a503d

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class StreamExecution(
6363

6464
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
6565

66+
private val noDataProgressEventInterval =
67+
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
68+
6669
/**
6770
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
6871
*/
@@ -196,6 +199,9 @@ class StreamExecution(
196199
// While active, repeatedly attempt to run batches.
197200
SparkSession.setActiveSession(sparkSession)
198201

202+
// The timestamp we report an event that has no input data
203+
var lastNoDataProgressEventTime = Long.MinValue
204+
199205
triggerExecutor.execute(() => {
200206
startTrigger()
201207

@@ -218,7 +224,17 @@ class StreamExecution(
218224

219225
// Report trigger as finished and construct progress object.
220226
finishTrigger(dataAvailable)
221-
postEvent(new QueryProgressEvent(lastProgress))
227+
if (dataAvailable) {
228+
// Reset noDataEventTimestamp if we processed any data
229+
lastNoDataProgressEventTime = Long.MinValue
230+
postEvent(new QueryProgressEvent(lastProgress))
231+
} else {
232+
val now = triggerClock.getTimeMillis()
233+
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
234+
lastNoDataProgressEventTime = now
235+
postEvent(new QueryProgressEvent(lastProgress))
236+
}
237+
}
222238

223239
if (dataAvailable) {
224240
// We'll increase currentBatchId after we complete processing current batch's data

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,13 @@ object SQLConf {
603603
.timeConf(TimeUnit.MILLISECONDS)
604604
.createWithDefault(10L)
605605

606+
val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
607+
SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
608+
.internal()
609+
.doc("How long to wait between two progress events when there is no data")
610+
.timeConf(TimeUnit.MILLISECONDS)
611+
.createWithDefault(10000L)
612+
606613
val STREAMING_METRICS_ENABLED =
607614
SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
608615
.doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
@@ -684,6 +691,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
684691

685692
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
686693

694+
def streamingNoDataProgressEventInterval: Long =
695+
getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
696+
687697
def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
688698

689699
def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._
3131
import org.apache.spark.SparkException
3232
import org.apache.spark.scheduler._
3333
import org.apache.spark.sql.execution.streaming._
34+
import org.apache.spark.sql.internal.SQLConf
3435
import org.apache.spark.sql.streaming.StreamingQueryListener._
3536
import org.apache.spark.util.JsonProtocol
3637

@@ -46,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
4647
assert(spark.streams.active.isEmpty)
4748
assert(addedListeners.isEmpty)
4849
// Make sure we don't leak any events to the next test
50+
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
4951
}
5052

5153
testQuietly("single listener, check trigger events are generated correctly") {
@@ -191,6 +193,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
191193
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
192194
}
193195

196+
test("only one progress event per interval when no data") {
197+
// This test will start a query but not push any data, and then check if we push too many events
198+
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") {
199+
@volatile var numProgressEvent = 0
200+
val listener = new StreamingQueryListener {
201+
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
202+
override def onQueryProgress(event: QueryProgressEvent): Unit = {
203+
numProgressEvent += 1
204+
}
205+
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
206+
}
207+
spark.streams.addListener(listener)
208+
try {
209+
val input = new MemoryStream[Int](0, sqlContext) {
210+
@volatile var numTriggers = 0
211+
override def getOffset: Option[Offset] = {
212+
numTriggers += 1
213+
super.getOffset
214+
}
215+
}
216+
val clock = new StreamManualClock()
217+
val actions = mutable.ArrayBuffer[StreamAction]()
218+
actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
219+
for (_ <- 1 to 100) {
220+
actions += AdvanceManualClock(10)
221+
}
222+
actions += AssertOnQuery { _ =>
223+
eventually(timeout(streamingTimeout)) {
224+
assert(input.numTriggers > 100) // at least 100 triggers have occurred
225+
}
226+
true
227+
}
228+
testStream(input.toDS)(actions: _*)
229+
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
230+
// 11 is the max value of the possible numbers of events.
231+
assert(numProgressEvent > 1 && numProgressEvent <= 11)
232+
} finally {
233+
spark.streams.removeListener(listener)
234+
}
235+
}
236+
}
237+
194238
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
195239
// query-event-logs-version-2.0.0.txt has all types of events generated by
196240
// Structured Streaming in Spark 2.0.0.

0 commit comments

Comments
 (0)