Skip to content

Wrong aggregation result in Spark SQL tests after enabling columnar shuffle #260

@viirya

Description

@viirya

Describe the bug

While trying to enable columnar shuffle by default, I found some Spark SQL tests are failed. Some are wrong aggregate result, e.g.

SQLQuerySuite: SPARK-8828 sum should return null if all input values are null

[info]   == Physical Plan ==                                                                                                                                                                                                           
[info]   AdaptiveSparkPlan isFinalPlan=true                                                                                                                                                                                            
[info]   +- == Final Plan ==                                                                                                                                                                                                           
[info]      *(2) ColumnarToRow                                                                                                                                                                                                         
[info]      +- CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]                                                                                                                                  
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5228]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]                        +- Scan[obj#135]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5117]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]               +- Scan[obj#135]
...
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):bigint,avg(a):double>
[info]   ![null,null]                [null,NaN] (QueryTest.scala:243)

aggregation with codegen:

== Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10168]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]                     +- *(1) SerializeFromObject
[info]                        +- Scan[obj#12]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10060]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]            +- SerializeFromObject
[info]               +- Scan[obj#12]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):double,avg(a):double,count(NULL):bigint>
[info]   ![null,null,0]              [null,NaN,0] (QueryTest.scala:243)

SPARK-3176 Added Parser of SQL LAST():

[info]   == Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]                        +- Scan[obj#92]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10279]
[info]         +- HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]               +- Scan[obj#92]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<last(n):int>
[info]   ![4]                        [2] (QueryTest.scala:243)

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions