Skip to content

Commit ffe65b0

Browse files
uncleGenzsxwing
authored andcommitted
[SPARK-19861][SS] watermark should not be a negative time.
## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <[email protected]> Author: dylon <[email protected]> Closes #17202 from uncleGen/SPARK-19861. (cherry picked from commit 30b18e6) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 2a76e24 commit ffe65b0

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ class Dataset[T] private[sql](
559559
* @param eventTime the name of the column that contains the event time of the row.
560560
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
561561
* record that has been processed in the form of an interval
562-
* (e.g. "1 minute" or "5 hours").
562+
* (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
563563
*
564564
* @group streaming
565565
* @since 2.1.0
@@ -572,6 +572,8 @@ class Dataset[T] private[sql](
572572
val parsedDelay =
573573
Option(CalendarInterval.fromString("interval " + delayThreshold))
574574
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
575+
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
576+
s"delay threshold ($delayThreshold) should not be negative.")
575577
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
576578
}
577579

sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
306306
)
307307
}
308308

309+
test("delay threshold should not be negative.") {
310+
val inputData = MemoryStream[Int].toDF()
311+
var e = intercept[IllegalArgumentException] {
312+
inputData.withWatermark("value", "-1 year")
313+
}
314+
assert(e.getMessage contains "should not be negative.")
315+
316+
e = intercept[IllegalArgumentException] {
317+
inputData.withWatermark("value", "1 year -13 months")
318+
}
319+
assert(e.getMessage contains "should not be negative.")
320+
321+
e = intercept[IllegalArgumentException] {
322+
inputData.withWatermark("value", "1 month -40 days")
323+
}
324+
assert(e.getMessage contains "should not be negative.")
325+
326+
e = intercept[IllegalArgumentException] {
327+
inputData.withWatermark("value", "-10 seconds")
328+
}
329+
assert(e.getMessage contains "should not be negative.")
330+
}
331+
309332
test("the new watermark should override the old one") {
310333
val df = MemoryStream[(Long, Long)].toDF()
311334
.withColumn("first", $"_1".cast("timestamp"))

0 commit comments

Comments
 (0)