Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class Dataset[T] private[sql](
* @param eventTime the name of the column that contains the event time of the row.
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
* record that has been processed in the form of an interval
* (e.g. "1 minute" or "5 hours").
* (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
*
* @group streaming
* @since 2.1.0
Expand All @@ -576,6 +576,8 @@ class Dataset[T] private[sql](
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0) to make delayThreshold more reasonable and significative.

EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}

test("delay threshold should not be negative.") {
val inputData = MemoryStream[Int].toDF()
var e = intercept[IllegalArgumentException] {
inputData.withWatermark("value", "-1 year")
}
assert(e.getMessage contains "should not be negative.")

e = intercept[IllegalArgumentException] {
inputData.withWatermark("value", "1 year -13 months")
}
assert(e.getMessage contains "should not be negative.")

e = intercept[IllegalArgumentException] {
inputData.withWatermark("value", "1 month -40 days")
}
assert(e.getMessage contains "should not be negative.")

e = intercept[IllegalArgumentException] {
inputData.withWatermark("value", "-10 seconds")
}
assert(e.getMessage contains "should not be negative.")
}

test("the new watermark should override the old one") {
val df = MemoryStream[(Long, Long)].toDF()
.withColumn("first", $"_1".cast("timestamp"))
Expand Down