Skip to content

Commit 7c86b0e

Browse files
committed
[SPARK-20567] Lazily bind in GenerateExec
1 parent ef3df91 commit 7c86b0e

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ case class GenerateExec(
7878

7979
override def outputPartitioning: Partitioning = child.outputPartitioning
8080

81-
val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
81+
lazy val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
8282

8383
protected override def doExecute(): RDD[InternalRow] = {
8484
// boundGenerator.terminate() should be triggered after all of the rows in the partition

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
6969
)
7070
}
7171

72+
test("count distinct") {
73+
val inputData = MemoryStream[(Int, Seq[Int])]
74+
75+
val aggregated =
76+
inputData.toDF()
77+
.select($"*", explode($"_2") as 'value)
78+
.groupBy($"_1")
79+
.agg(size(collect_set($"value")))
80+
.as[(Int, Int)]
81+
82+
testStream(aggregated, Update)(
83+
AddData(inputData, (1, Seq(1, 2))),
84+
CheckLastBatch((1, 2))
85+
)
86+
}
87+
7288
test("simple count, complete mode") {
7389
val inputData = MemoryStream[Int]
7490

0 commit comments

Comments
 (0)