-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27986][SQL][FOLLOWUP] Respect filter in sql/toString of AggregateExpression #27198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
How about this? @beliefer @cloud-fan |
prefix + aggregateFunction.toAggString(isDistinct) | ||
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct) | ||
mode match { | ||
case Partial | Complete if filter.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although we made filter evaluated in first Aggregate.
I think there should just judge that the filter is defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing so shows a filter in a second aggregate in physical plans like this;
== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint)) filter ((v#1 > 3))], output=[k#0, sum(v) FILTER ((v > 3))#44L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#154]
+- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint)) filter ((v#1 > 3))], output=[k#0, sum#48L])
+- *(1) ColumnarToRow
+- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the mode is PartialMerge
.
If we only need to show filter in physical plans after rewrite, this is OK.
Test build #116684 has finished for PR 27198 at commit
|
Test build #116689 has finished for PR 27198 at commit
|
retest this please |
override def sql: String = { | ||
val aggFuncStr = aggregateFunction.sql(isDistinct) | ||
mode match { | ||
case Partial | Complete if filter.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another idea: if the filter is not used under some modes, can we drop it when we do things like aggExprs.map(_.copy(mode = ABC))
? Then here we can blindly print the filter if it's there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It might be a cleaner, but the AS-IS PR also looks safer because this PR is a read-only update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's safe, but the part confuses me is why only partial and complete mode respect the filter. I still need to look at AggUtils
, where we set the mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, ok. I'll check the approach, too.
Test build #116693 has finished for PR 27198 at commit
|
Test build #116756 has finished for PR 27198 at commit
|
retest this please |
prefix + aggregateFunction.toAggString(isDistinct) | ||
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct) | ||
filter match { | ||
case Some(predicate) => s"$aggFuncStr filter $predicate" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we follow the sql syntax? FILTER (WHERE $predicate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
override def sql: String = { | ||
val aggFuncStr = aggregateFunction.sql(isDistinct) | ||
filter match { | ||
case Some(predicate) => s"$aggFuncStr FILTER ${predicate.sql}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// Aggregate filters are applicable only in partial/complete modes; | ||
// this method filters out them, otherwise. | ||
case Partial | Complete => ae | ||
case _ => ae.copy(filter = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also simplify AggregateExpression.references
now
Test build #116767 has finished for PR 27198 at commit
|
LGTM |
Test build #116783 has finished for PR 27198 at commit
|
Retest this please. |
prefix + aggregateFunction.toAggString(isDistinct) | ||
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct) | ||
filter match { | ||
case Some(predicate) => s"$aggFuncStr filter (where $predicate)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, filter (where
-> FILTER (WHERE
?
Currently, in the generated result, filter (where NOT
and FILTER (WHERE (NOT
are used together. Maybe, the same output might be better?
sum(salary#x) filter (where NOT exists#x [dept_id#x]) AS sum(salary) FILTER (WHERE (NOT exists(dept_id)))#x]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, ok.
Test build #116796 has finished for PR 27198 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Test build #116799 has finished for PR 27198 at commit
|
retest this please |
Test build #116800 has finished for PR 27198 at commit
|
Merged to master. Thanks, @cloud-fan & @dongjoon-hyun ! |
Test build #116801 has finished for PR 27198 at commit
|
What changes were proposed in this pull request?
This pr intends to add filter information in the explain output of an aggregate (This is a follow-up of #26656).
Without this pr:
With this pr:
Why are the changes needed?
For better usability.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually.