@@ -68,16 +68,16 @@ trait StateStoreWriter extends StatefulOperator {
6868}
6969
7070/** An operator that supports watermark. */
71- trait WatermarkSupport extends SparkPlan {
71+ trait WatermarkSupport extends UnaryExecNode {
7272
7373 /** The keys that may have a watermark attribute. */
7474 def keyExpressions : Seq [Attribute ]
7575
7676 /** The watermark value. */
7777 def eventTimeWatermark : Option [Long ]
7878
79- /** Generate a predicate that matches data older than the watermark */
80- lazy val watermarkPredicate : Option [Predicate ] = {
79+ /** Generate an expression that matches data older than the watermark */
80+ lazy val watermarkExpression : Option [Expression ] = {
8181 val optionalWatermarkAttribute =
8282 keyExpressions.find(_.metadata.contains(EventTimeWatermark .delayKey))
8383
@@ -96,9 +96,19 @@ trait WatermarkSupport extends SparkPlan {
9696 }
9797
9898 logInfo(s " Filtering state store on: $evictionExpression" )
99- newPredicate( evictionExpression, keyExpressions)
99+ evictionExpression
100100 }
101101 }
102+
103+ /** Generate a predicate based on keys that matches data older than the watermark */
104+ lazy val watermarkPredicateForKeys : Option [Predicate ] =
105+ watermarkExpression.map(newPredicate(_, keyExpressions))
106+
107+ /**
108+ * Generate a predicate based on the child output that matches data older than the watermark.
109+ */
110+ lazy val watermarkPredicate : Option [Predicate ] =
111+ watermarkExpression.map(newPredicate(_, child.output))
102112}
103113
104114/**
@@ -192,7 +202,7 @@ case class StateStoreSaveExec(
192202 }
193203
194204 // Assumption: Append mode can be done only when watermark has been specified
195- store.remove(watermarkPredicate .get.eval _)
205+ store.remove(watermarkPredicateForKeys .get.eval _)
196206 store.commit()
197207
198208 numTotalStateRows += store.numKeys()
@@ -215,7 +225,9 @@ case class StateStoreSaveExec(
215225 override def hasNext : Boolean = {
216226 if (! baseIterator.hasNext) {
217227 // Remove old aggregates if watermark specified
218- if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval _)
228+ if (watermarkPredicateForKeys.nonEmpty) {
229+ store.remove(watermarkPredicateForKeys.get.eval _)
230+ }
219231 store.commit()
220232 numTotalStateRows += store.numKeys()
221233 false
@@ -361,7 +373,7 @@ case class StreamingDeduplicateExec(
361373 val numUpdatedStateRows = longMetric(" numUpdatedStateRows" )
362374
363375 val baseIterator = watermarkPredicate match {
364- case Some (predicate) => iter.filter(( row : InternalRow ) => ! predicate.eval(row))
376+ case Some (predicate) => iter.filter(row => ! predicate.eval(row))
365377 case None => iter
366378 }
367379
@@ -381,7 +393,7 @@ case class StreamingDeduplicateExec(
381393 }
382394
383395 CompletionIterator [InternalRow , Iterator [InternalRow ]](result, {
384- watermarkPredicate .foreach(f => store.remove(f.eval _))
396+ watermarkPredicateForKeys .foreach(f => store.remove(f.eval _))
385397 store.commit()
386398 numTotalStateRows += store.numKeys()
387399 })
0 commit comments