From dcc77eda3d88f8cd5c66b60730c0ada5ae717cc3 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Mar 2017 11:28:29 +0800 Subject: [PATCH 1/6] watermark should not be a negative time. --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1b04623596073..ad0e2dc613c3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be a negative time. * * @group streaming * @since 2.1.0 @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + assert(parsedDelay.microseconds > 0, + s"delay threshold should not be a negative time: $delayThreshold") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c34d119734cc0..1287dec97c5e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -305,6 +305,14 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("delay threshold should not be a negative time.") { + val inputData = MemoryStream[Int].toDF() + val e = intercept[AssertionError] { + inputData.withWatermark("value", "-1 minute") + } + assert(e.getMessage contains "delay threshold should not be a negative time") + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) From 00bbadc063e515653af86ab85fc95833bccf727a Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Mar 2017 11:30:13 +0800 Subject: [PATCH 2/6] update --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ad0e2dc613c3e..2303992930e4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -576,7 +576,7 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) - assert(parsedDelay.microseconds > 0, + assert(parsedDelay.microseconds >= 0, s"delay threshold should not be a negative time: $delayThreshold") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } From 93a5d6d357f88e14cb3aeea3153d3bf706415c70 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Mar 2017 18:28:25 +0800 Subject: [PATCH 3/6] fix the wrong delayMs calculation --- .../scala/org/apache/spark/sql/Dataset.scala | 7 +++++-- .../streaming/EventTimeWatermarkSuite.scala | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2303992930e4f..bdb3c4dd374a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -576,8 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) - assert(parsedDelay.microseconds >= 0, - s"delay threshold should not be a negative time: $delayThreshold") + val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth + } + assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 1287dec97c5e4..4997df6e7d36f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -307,8 +307,23 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin test("delay threshold should not be a negative time.") { val inputData = MemoryStream[Int].toDF() - val e = intercept[AssertionError] { - inputData.withWatermark("value", "-1 minute") + var e = intercept[AssertionError] { + inputData.withWatermark("value", "-1 year") + } + assert(e.getMessage contains "delay threshold should not be a negative time") + + e = intercept[AssertionError] { + inputData.withWatermark("value", "1 year -13 months") + } + assert(e.getMessage contains "delay threshold should not be a negative time") + + e = intercept[AssertionError] { + inputData.withWatermark("value", "1 months -40 days") + } + assert(e.getMessage contains "delay threshold should not be a negative time") + + e = intercept[AssertionError] { + inputData.withWatermark("value", "-10 seconds") } assert(e.getMessage contains "delay threshold should not be a negative time") } From c5e2221f937d074bee0ac15d6fa773bee93b982c Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Mar 2017 18:33:12 +0800 Subject: [PATCH 4/6] fix --- .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d796bfe661f28..d233a5c005cda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -319,7 +319,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin assert(e.getMessage contains "delay threshold should not be a negative time") e = intercept[AssertionError] { - inputData.withWatermark("value", "1 months -40 days") + inputData.withWatermark("value", "1 month -40 days") } assert(e.getMessage contains "delay threshold should not be a negative time") From b0c8f128cd8936a345a5a68c06bd8ea54160254d Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Mar 2017 23:30:26 +0800 Subject: [PATCH 5/6] address comments from @srowen --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index bdb3c4dd374a0..2ff947055cd97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -580,7 +580,7 @@ class Dataset[T] private[sql]( val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 parsedDelay.milliseconds + parsedDelay.months * millisPerMonth } - assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") + require(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d233a5c005cda..c6467212dc9e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -308,22 +308,22 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin test("delay threshold should not be a negative time.") { val inputData = MemoryStream[Int].toDF() - var e = intercept[AssertionError] { + var e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "-1 year") } assert(e.getMessage contains "delay threshold should not be a negative time") - e = intercept[AssertionError] { + e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "1 year -13 months") } assert(e.getMessage contains "delay threshold should not be a negative time") - e = intercept[AssertionError] { + e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "1 month -40 days") } assert(e.getMessage contains "delay threshold should not be a negative time") - e = intercept[AssertionError] { + e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "-10 seconds") } assert(e.getMessage contains "delay threshold should not be a negative time") From 10a949d8a1372fd43cf560530fdfc052bb027025 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Thu, 9 Mar 2017 09:25:30 +0800 Subject: [PATCH 6/6] address comments from @srowen and @rxin --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 9 +++------ .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 10 +++++----- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2ff947055cd97..c64cbf310dfa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). NOTE: This should not be a negative time. + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. * * @group streaming * @since 2.1.0 @@ -576,11 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) - val delayMs = { - val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 - parsedDelay.milliseconds + parsedDelay.months * millisPerMonth - } - require(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") + require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + s"delay threshold ($delayThreshold) should not be negative.") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c6467212dc9e5..7614ea5eb3c01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -306,27 +306,27 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } - test("delay threshold should not be a negative time.") { + test("delay threshold should not be negative.") { val inputData = MemoryStream[Int].toDF() var e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "-1 year") } - assert(e.getMessage contains "delay threshold should not be a negative time") + assert(e.getMessage contains "should not be negative.") e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "1 year -13 months") } - assert(e.getMessage contains "delay threshold should not be a negative time") + assert(e.getMessage contains "should not be negative.") e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "1 month -40 days") } - assert(e.getMessage contains "delay threshold should not be a negative time") + assert(e.getMessage contains "should not be negative.") e = intercept[IllegalArgumentException] { inputData.withWatermark("value", "-10 seconds") } - assert(e.getMessage contains "delay threshold should not be a negative time") + assert(e.getMessage contains "should not be negative.") } test("the new watermark should override the old one") {