Skip to content

Commit ca849ac

Browse files
committed
[SPARK-19841][SS] watermarkPredicate should filter based on keys
## What changes were proposed in this pull request? `StreamingDeduplicateExec.watermarkPredicate` should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in `keyExpression` has a different position in the row. `StateStoreSaveExec` has the same codes but its parent can makes sure the watermark column positions in `keyExpression` and `row` are the same. ## How was this patch tested? The added test. Author: Shixiong Zhu <[email protected]> Closes #17183 from zsxwing/SPARK-19841.
1 parent b9783a9 commit ca849ac

File tree

2 files changed

+39
-8
lines changed

2 files changed

+39
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ trait StateStoreWriter extends StatefulOperator {
6868
}
6969

7070
/** An operator that supports watermark. */
71-
trait WatermarkSupport extends SparkPlan {
71+
trait WatermarkSupport extends UnaryExecNode {
7272

7373
/** The keys that may have a watermark attribute. */
7474
def keyExpressions: Seq[Attribute]
7575

7676
/** The watermark value. */
7777
def eventTimeWatermark: Option[Long]
7878

79-
/** Generate a predicate that matches data older than the watermark */
80-
lazy val watermarkPredicate: Option[Predicate] = {
79+
/** Generate an expression that matches data older than the watermark */
80+
lazy val watermarkExpression: Option[Expression] = {
8181
val optionalWatermarkAttribute =
8282
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
8383

@@ -96,9 +96,19 @@ trait WatermarkSupport extends SparkPlan {
9696
}
9797

9898
logInfo(s"Filtering state store on: $evictionExpression")
99-
newPredicate(evictionExpression, keyExpressions)
99+
evictionExpression
100100
}
101101
}
102+
103+
/** Generate a predicate based on keys that matches data older than the watermark */
104+
lazy val watermarkPredicateForKeys: Option[Predicate] =
105+
watermarkExpression.map(newPredicate(_, keyExpressions))
106+
107+
/**
108+
* Generate a predicate based on the child output that matches data older than the watermark.
109+
*/
110+
lazy val watermarkPredicate: Option[Predicate] =
111+
watermarkExpression.map(newPredicate(_, child.output))
102112
}
103113

104114
/**
@@ -192,7 +202,7 @@ case class StateStoreSaveExec(
192202
}
193203

194204
// Assumption: Append mode can be done only when watermark has been specified
195-
store.remove(watermarkPredicate.get.eval _)
205+
store.remove(watermarkPredicateForKeys.get.eval _)
196206
store.commit()
197207

198208
numTotalStateRows += store.numKeys()
@@ -215,7 +225,9 @@ case class StateStoreSaveExec(
215225
override def hasNext: Boolean = {
216226
if (!baseIterator.hasNext) {
217227
// Remove old aggregates if watermark specified
218-
if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval _)
228+
if (watermarkPredicateForKeys.nonEmpty) {
229+
store.remove(watermarkPredicateForKeys.get.eval _)
230+
}
219231
store.commit()
220232
numTotalStateRows += store.numKeys()
221233
false
@@ -361,7 +373,7 @@ case class StreamingDeduplicateExec(
361373
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
362374

363375
val baseIterator = watermarkPredicate match {
364-
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
376+
case Some(predicate) => iter.filter(row => !predicate.eval(row))
365377
case None => iter
366378
}
367379

@@ -381,7 +393,7 @@ case class StreamingDeduplicateExec(
381393
}
382394

383395
CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
384-
watermarkPredicate.foreach(f => store.remove(f.eval _))
396+
watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
385397
store.commit()
386398
numTotalStateRows += store.numKeys()
387399
})

sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,23 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
249249
}
250250
}
251251
}
252+
253+
test("SPARK-19841: watermarkPredicate should filter based on keys") {
254+
val input = MemoryStream[(Int, Int)]
255+
val df = input.toDS.toDF("time", "id")
256+
.withColumn("time", $"time".cast("timestamp"))
257+
.withWatermark("time", "1 second")
258+
.dropDuplicates("id", "time") // Change the column positions
259+
.select($"id")
260+
testStream(df)(
261+
AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
262+
CheckLastBatch(1, 2),
263+
AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
264+
CheckLastBatch(3, 4),
265+
AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) due to watermark
266+
CheckLastBatch(5, 6),
267+
AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
268+
CheckLastBatch(7)
269+
)
270+
}
252271
}

0 commit comments

Comments
 (0)