diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 1799f0d1336e5..faa5224235333 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -22,7 +22,7 @@ from pyspark.sql import Row from pyspark.sql.functions import lit -from pyspark.sql.types import StructType, StructField, IntegerType, StringType +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -392,6 +392,30 @@ def test_streaming_with_temporary_view(self): set([Row(value="view_a"), Row(value="view_b"), Row(value="view_c")]), set(result) ) + def test_streaming_drop_duplicate_within_watermark(self): + """ + This verifies dropDuplicatesWithinWatermark works with a streaming dataframe. + """ + user_schema = StructType().add("time", TimestampType()).add("id", "integer") + df = ( + self.spark.readStream.option("sep", ";") + .schema(user_schema) + .csv("python/test_support/sql/streaming/time") + ) + q1 = ( + df.withWatermark("time", "2 seconds") + .dropDuplicatesWithinWatermark(["id"]) + .writeStream.outputMode("update") + .format("memory") + .queryName("test_streaming_drop_duplicates_within_wm") + .start() + ) + self.assertTrue(q1.isActive) + q1.processAllAvailable() + q1.stop() + result = self.spark.sql("SELECT * FROM test_streaming_drop_duplicates_within_wm").collect() + self.assertTrue(len(result) >= 6 and len(result) <= 9) + class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase): def _assert_exception_tree_contains_msg(self, exception, msg): diff --git a/python/test_support/sql/streaming/time/text-with-time-test.txt b/python/test_support/sql/streaming/time/text-with-time-test.txt new file mode 100644 index 0000000000000..cf9edcafe5a49 --- /dev/null +++ b/python/test_support/sql/streaming/time/text-with-time-test.txt @@ -0,0 +1,10 @@ +2024-05-24 15:03:20;1 +2024-05-24 15:03:21;2 +2024-05-24 15:03:24;3 +2024-05-24 15:03:25;3 +2024-05-24 15:03:31;4 +2024-05-24 15:03:31;1 +2024-05-24 15:03:32;3 +2024-05-24 15:03:45;2 +2024-05-24 15:03:46;5 +2024-05-24 15:03:50;6 \ No newline at end of file