Skip to content

Conversation

@prakharjain09
Copy link
Contributor

What changes were proposed in this pull request?

This PR tries to reduce the number of physical aggregation nodes by collapsing the PARTIAL and the FINAL aggregation nodes together when there is no Exchange between them.

Example - consider the following query:

SELECT sum(t2.col1), max(t2.col2), t1.col1, t1.col2
FROM t1, t2
WHERE t1.col1 = t2.col1
GROUP BY t1.col1, t1.col2

Current plan:

  == Physical Plan ==
  *(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18 as bigint)), max(col2#19)], output=[sum(col1)#140L, max(col2)#141, col1#7, col2#8])
  +- *(5) HashAggregate(keys=[col1#7, col2#8], functions=[partial_sum(cast(col1#18 as bigint)), partial_max(col2#19)], output=[col1#7, col2#8, sum#148L, max#149])
     +- *(5) SortMergeJoin [col1#7], [col1#18], Inner
        :- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
        :  +- Exchange hashpartitioning(col1#7, 5), true, [id=#644]
        :     +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS col2#8]
        :        +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
        :           +- Scan[obj#1]
        +- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
           +- Exchange hashpartitioning(col1#18, 5), true, [id=#653]
              +- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS col2#19]
                 +- *(3) SerializeFromObject [input[0, int, false] AS value#13]
                    +- Scan[obj#12]

The above plan can be optimized to following:

  == Physical Plan ==
  *(5) HashAggregate(keys=[col1#7, col2#8], functions=[sum(cast(col1#18 as bigint)), max(col2#19)], output=[sum(col1)#157L, max(col2)#158, col1#7, col2#8])
  +- *(5) SortMergeJoin [col1#7], [col1#18], Inner
     :- *(2) Sort [col1#7 ASC NULLS FIRST], false, 0
     :  +- Exchange hashpartitioning(col1#7, 5), true, [id=#727]
     :     +- *(1) Project [value#2 AS col1#7, (value#2 % 10) AS col2#8]
     :        +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
     :           +- Scan[obj#1]
     +- *(4) Sort [col1#18 ASC NULLS FIRST], false, 0
        +- Exchange hashpartitioning(col1#18, 5), true, [id=#736]
           +- *(3) Project [value#13 AS col1#18, (value#13 % 10) AS col2#19]
              +- *(3) SerializeFromObject [input[0, int, false] AS value#13]
                 +- Scan[obj#12]

Why are the changes needed?

This change removed the unrequired Aggregation node and so will help in improving performance.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added UTs.

@github-actions github-actions bot added the SQL label Nov 19, 2020
@maropu
Copy link
Member

maropu commented Nov 19, 2020

ok to test

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131346 has finished for PR 30426 at commit 5965fb9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09
Copy link
Contributor Author

cc - @maropu @cloud-fan @dongjoon-hyun

@maropu
Copy link
Member

maropu commented Nov 19, 2020

I remember SPARK-12978 (#15945 and #10896) and is this related to it? cc: @cloud-fan Btw, have you checked if this optimization could make some queries (e.g., TPCDS) faster? (I just want to know actual performance numbers)

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35950/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35950/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35951/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35951/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131347 has finished for PR 30426 at commit 2c68fe3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09
Copy link
Contributor Author

@maropu Thanks for pointing out to old PR and jirs - Yes SPARK-12978 seems related to SPARK-33486.

Btw, have you checked if this optimization could make some queries (e.g., TPCDS) faster?

I did impact analysis on TPCDS 100 scale and didn't find noticeable improvement - In TPCDS at most of the places, the 1st HashAggregate (HA) reduces rows significantly and the 2nd HA doesn't take a lot of time after that.

But we have seen some good improvements in some customer queries - Specifically when HA-1 doesn't reduce rows significantly.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35964/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35964/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131360 has finished for PR 30426 at commit e9a25d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131425 has finished for PR 30426 at commit dfad4fc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09 prakharjain09 force-pushed the SPARK-33486-collapse-aggregates branch from dfad4fc to a56846d Compare November 20, 2020 14:07
@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36031/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36033/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36031/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36033/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131427 has finished for PR 30426 at commit e7f326a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
  • case class RemoveShufflePushMergerLocation(host: String) extends ToBlockManagerMaster
  • abstract class LikeAllBase extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant
  • case class LikeAll(child: Expression, patterns: Seq[UTF8String]) extends LikeAllBase
  • case class NotLikeAll(child: Expression, patterns: Seq[UTF8String]) extends LikeAllBase
  • case class ParseUrl(children: Seq[Expression], failOnError: Boolean = SQLConf.get.ansiEnabled)

@prakharjain09
Copy link
Contributor Author

@maropu @cloud-fan Gentle reminder - Please review the changes and provide your feedback.

@maropu
Copy link
Member

maropu commented Nov 24, 2020

But we have seen some good improvements in some customer queries - Specifically when HA-1 doesn't reduce rows significantly.

Yea, I've checked TPCDS performances w/this change again by myself, but I couldn't find any improvement. So, could you give us a concrete example of how much it will improve performance? This change can make rules complicated, so I think we need to consider the tradeoff between complexity and performance improvements.

@prakharjain09
Copy link
Contributor Author

So, could you give us a concrete example of how much it will improve performance?

@maropu We have seen customer queries where Aggregation happens on close to primary keys. In those scenarios, it makes complete sense to remove redundant Aggregation operator as it will unnecessarily increase the execution time.

@abmodi
Copy link

abmodi commented Dec 4, 2020

We have also seen the use case with customers when they do aggregation on close to primary keys.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 15, 2021
@github-actions github-actions bot closed this Mar 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants