Skip to content

Conversation

maropu
Copy link
Member

@maropu maropu commented Jan 14, 2020

What changes were proposed in this pull request?

This pr intends to add filter information in the explain output of an aggregate (This is a follow-up of #26656).

Without this pr:

scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true)
== Parsed Logical Plan ==
'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)]
+- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
k: int, sum(v): bigint
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L]
+- SubqueryAlias `default`.`t`
   +- Relation[k#0,v#1] parquet

== Optimized Logical Plan ==
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L]
+- Relation[k#0,v#1] parquet

== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v)#3L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#20]
   +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint))], output=[k#0, sum#7L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>


scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show()
+---+------+                                                                    
|  k|sum(v)|
+---+------+
+---+------+

With this pr:

scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true)
== Parsed Logical Plan ==
'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)]
+- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
k: int, sum(v) FILTER (v > 3): bigint
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L]
+- SubqueryAlias `default`.`t`
   +- Relation[k#0,v#1] parquet

== Optimized Logical Plan ==
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L]
+- Relation[k#0,v#1] parquet

== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v) FILTER (v > 3)#5L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#20]
   +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint)) filter (v#1 > 3)], output=[k#0, sum#9L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>


scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show()
+---+---------------------+                                                     
|  k|sum(v) FILTER (v > 3)|
+---+---------------------+
+---+---------------------+

Why are the changes needed?

For better usability.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually.

@maropu
Copy link
Member Author

maropu commented Jan 14, 2020

How about this? @beliefer @cloud-fan

prefix + aggregateFunction.toAggString(isDistinct)
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct)
mode match {
case Partial | Complete if filter.isDefined =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we made filter evaluated in first Aggregate.
I think there should just judge that the filter is defined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing so shows a filter in a second aggregate in physical plans like this;

== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint)) filter ((v#1 > 3))], output=[k#0, sum(v) FILTER ((v > 3))#44L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#154]
   +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint)) filter ((v#1 > 3))], output=[k#0, sum#48L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the mode is PartialMerge.
If we only need to show filter in physical plans after rewrite, this is OK.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116684 has finished for PR 27198 at commit e54839b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116689 has finished for PR 27198 at commit cc658c9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 14, 2020

retest this please

override def sql: String = {
val aggFuncStr = aggregateFunction.sql(isDistinct)
mode match {
case Partial | Complete if filter.isDefined =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another idea: if the filter is not used under some modes, can we drop it when we do things like aggExprs.map(_.copy(mode = ABC))? Then here we can blindly print the filter if it's there.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It might be a cleaner, but the AS-IS PR also looks safer because this PR is a read-only update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's safe, but the part confuses me is why only partial and complete mode respect the filter. I still need to look at AggUtils, where we set the mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, ok. I'll check the approach, too.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116693 has finished for PR 27198 at commit cc658c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116756 has finished for PR 27198 at commit 753e9e1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 15, 2020

retest this please

prefix + aggregateFunction.toAggString(isDistinct)
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct)
filter match {
case Some(predicate) => s"$aggFuncStr filter $predicate"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we follow the sql syntax? FILTER (WHERE $predicate)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

override def sql: String = {
val aggFuncStr = aggregateFunction.sql(isDistinct)
filter match {
case Some(predicate) => s"$aggFuncStr FILTER ${predicate.sql}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// Aggregate filters are applicable only in partial/complete modes;
// this method filters out them, otherwise.
case Partial | Complete => ae
case _ => ae.copy(filter = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also simplify AggregateExpression.references now

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116767 has finished for PR 27198 at commit 753e9e1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116783 has finished for PR 27198 at commit 9d4ba8b.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

prefix + aggregateFunction.toAggString(isDistinct)
val aggFuncStr = prefix + aggregateFunction.toAggString(isDistinct)
filter match {
case Some(predicate) => s"$aggFuncStr filter (where $predicate)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, filter (where -> FILTER (WHERE?

Currently, in the generated result, filter (where NOT and FILTER (WHERE (NOT are used together. Maybe, the same output might be better?

sum(salary#x) filter (where NOT exists#x [dept_id#x]) AS sum(salary) FILTER (WHERE (NOT exists(dept_id)))#x]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, ok.

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116796 has finished for PR 27198 at commit 9d4ba8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 16, 2020

@dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116799 has finished for PR 27198 at commit 275ca45.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 16, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116800 has finished for PR 27198 at commit f43c981.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in a3a42b3 Jan 16, 2020
@maropu
Copy link
Member Author

maropu commented Jan 16, 2020

Merged to master. Thanks, @cloud-fan & @dongjoon-hyun !

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116801 has finished for PR 27198 at commit f43c981.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants