-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-44571][SQL] Eliminate the Join by combine multiple Aggregates #42223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @cloud-fan @viirya cc @MaxGekk @gengliangwang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: CombineJoinedAggregates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must not exists filter what does it mean? do you mean contains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. The comment is confused. I mean is not support aggregate expression with filter.
I will update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think transform up is better. For a multi-join query, we can eliminate the leaf joins first, and transform up to eliminate all joins if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to handle Join here. If the rule runs bottom up, leaf joins become Aggregate and then we can match simple join again.
|
Hi @beliefer,
So I would suggest:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you are trying to merge aggregates in sequential order only. What if the first can't be merged with the rest, but the rest can be merged together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule can merge any continuous part of to be merged aggregates in sequential order.
If the first can't be merged with the rest, the rest can still be merged together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it now thanks! Maybe we could try merging an aggregate with all the previous merged and non-merged ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I'm not sure how can you replace the Nevermind.Join to the last merged aggregate only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is correct as you simply ignore the propagated filters to here, but I can imagine adjacent Filter nodes in a plan...
Please check #37630 how it handles this case, but as I mentioned IMO the plan merge code shouldn't be duplicated to here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this PR support some of the most basic forms do not need to propagate filters here temporarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this case branch seems to behave incorrectly when you have adjacent filters in both plans.
E.g. if you have:
SELECT avg(a)
FROM (
SELECT * FROM t WHERE c1 = 1
) t
WHERE c2 = 1
and
SELECT sum(b)
FROM (
SELECT * FROM t WHERE c1 = 2
) t
WHERE c2 = 2
and PushDownPredicates is disabled then you are in that situation. When you are merging the parent Filter nodes you shouldn't ignore the propagated filters from child Filter nodes.
Actually #37630 handles that case and many others and that's why I think splitting this PR into 2 parts would be better:
- This rule would be still useful if you used the existing
MergeScalarSubqueries.tryMergePlans()to merge plans. It couldn't deal with Q28 immediately, but could deal with simpler aggregates without differing filters. - [SPARK-40193][SQL] Merge subquery plans with different filters #37630 could be a good base for implementing the filter merging feature. We could simplify that PR and remove the tricky physical plan checking part and introduce a simple boolean flag to enable/disable the filter merging feature like this PR does. In this case both Q9 and Q28 could benefit from the new feature...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth Thank you for the point. As you said, this situation is only worth considering when PushDownPredicates is disabled. AFAIK, PushDownPredicates is enabled and we can't disable it. This PR placed CombineJoinedAggregates after PushDownPredicates in Optimizer list, so we could avoid to treat the scene.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added test case test("join two side are Aggregates with subquery")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the user can disable PushDownPredicates with spark.sql.optimizer.excludedRules and spark.sql.adaptive.optimizer.excludedRules configs. And there might be other cases when we end up in that situation, that was just and example...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if users arbitrarily exclude the optimizer, then I believe most rules will execute improperly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, otimizer rules might not make plans any better if prerequisites aren't met, but a rule shouldn't create incorrect plan in any circumstances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got you mean now. Let's fix it.
|
@peter-toth Thank you for the review. For your second point, I'm OK to pass Your suggestion is welcome. But I think what's the chance to reuse them? If #37630 merged first, I will reuse them directly. Otherwise, I will create follow up PR to fix. |
| /** | ||
| * The helper class used to merge scalar subqueries. | ||
| */ | ||
| trait MergeScalarSubqueriesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth I create this trait used to share the common functions.
|
@cloud-fan The CI failure is unrelated. |
|
Instead of showing an example query, can you define the general form of joined aggregates that can be merged? |
| ne.transform { | ||
| case ae @ AggregateExpression(_, _, _, filterOpt, _) => | ||
| val newFilter = filterOpt.map { filter => | ||
| And(filter, propagatedFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old query evaluates Filter first, we should keep it and here it should be And(propagatedFilter, filter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
| Seq(Inner, Cross, LeftOuter, RightOuter, FullOuter).contains(joinType) | ||
|
|
||
| // Merge the multiple Aggregates. | ||
| private def mergePlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the return type is a bit complicated, let's add comment to explain
|
|
||
| (Aggregate(Seq.empty, mergedAggregateExprs, newChild), AttributeMap.empty, Seq.empty) | ||
| } | ||
| case (lp: Project, rp: Project) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have tests that hit this branch? Ideally Aggregate and Project will be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This test case has already been covered. Please see test("join side is not Aggregate")
| * "benchmarks/CombineJoinedAggregatesBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object CombineJoinedAggregatesBenchmark extends SqlBasedBenchmark { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need to add microbenchmark for optimizer features. The plan change is sufficient to show the benefits. You can keep the benchmark locally to show the perf numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| */ | ||
| trait MergeScalarSubqueriesHelper { | ||
|
|
||
| // If 2 plans are identical return the attribute mapping from the new to the cached version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does cached version mean? I think this method will use the left query as the main query and return the attribute map to rewrite the right query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. The comment is not correct. Let's updated it.
| * every [[Join]] are [[Aggregate]]s. | ||
| * | ||
| * Note: this rule doesn't following cases: | ||
| * 1. One of the to be merged two [[Aggregate]]s with child [[Filter]] and the other one is not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about it, I think what really matters is the predicate selectivity. No filter means an always true predicate.
To be safe, I think we should reuse some code of DPP that determines selective predicates, and only merge aggregates if they both have selective predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about it, I think what really matters is the predicate selectivity. No filter means an always true predicate.
To be safe, I think we should reuse some code of DPP that determines selective predicates, and only merge aggregates if they both have selective predicates.
Is that so? I think the performance gain of merging queries comes from the fact that we don't need to scan the same data 2 times but we can calculate the 2 aggregates in 1 run.
E.g. If we have 2 queries:
SELECT sum(a) FROM t WHERE <condition 1>
and
SELECT sum(b) FROM t WHERE <condition 2>
and we want to decide when merging the queries into
SELECT
sum(a) FILTER (WHERE <condition 1>),
sum(b) FILTER (WHERE <condition 2>)
FROM t
WHERE <condition 1> OR <condition 2>
make sense then I would say:
- if the conditions are true literals (or they doesn't exist at all)
- or the conditions match
then it definitely does make sense to merge them.
But in the first case the conditions are not selective at all and in the second case condition selectivity doesn't matter. (Actually these are the 2 cases that are already covered by MergeScalarSubqueries.tryMergePlans(): https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L275-L286.)
Now, if the conditions differ then it isn't trivial to calculate if scans overlap, but I think if both conditions are highly selective then we have less chance for scans to overlap, don't we?
BTW, in my #37630 I used a different heuristics to disable merging of aggregates with different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if we disable this feature by default then do we need to define any heuristics? Can't we just let the user enable the feature if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense then I would say:
- if the conditions are true literals (or they doesn't exist at all)
- or the conditions match
I agree the opinion. This PR support these two cases.
- The predicate of two side are true or they doesn't exist at all.
- The predicate of two side are selective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging can always "inflate" data in case of aggregate function columns are different to filter condition columns as those condition columns need to go up to aggregate in the merged query while in the original queries they were pruned after filter. In this case if we alias the conditions to small boolean columns then shuffle data size decrease and less "stressful".
The worst case you are referring to is when aggregate function columns are the superset of filter condition columns, but even in this case only small boolean columns are added and the gain is that we don't need to evaluate the filters twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take an example, condition 1 is flag is true and condition 2 is flag is false, put these conditions into project list will add two boolean columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a conditions is f(c) and c is not used in the aggregate function (e.g. it is sum(a)) then the project also removes c besides adding a new boolean column (we can write sum(a) FILTER (WHERE <boolean column>) instead of sum(a) FILTER (WHERE f(c))) .
If f(c1, c2, ..., c10) is the condition and none of these are in the aggregate function then the project removes 10 columns and adds only 1 boolean.
What you are reffering to is the worst case, but IMO on average, the extra project helps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases, the f(c) are different. e.g. f1(c), f2(c) and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ok. It is not the conditions (f(c)s) relation to each other what matters, but the filter columns set (c1 ... c10) relation to aggregate function column set (a).
@peter-toth Could you tell me more? I can't found the treat for partitioning or bucketing columns. |
That heuristics made the whole PR complex. You can follow the logic of BTW, I'm not saying that it is the right heuristics to decide if we should merge aggregates with different filters, it is just the one I was able to come up with... |
|
@beliefer, I've updated my PR with comments to ellaborate on how it handles |
I think the above code is not needed. Generally, we concat the predicates with |
Please comment on the other PR regarding the code of that PR. |
In theory, whether it is data filters or partition filters, there is a possibility of data overlap when connected filters with The main reason for filter merging is the amount of overlapping data. For example, |
Sorry @beliefer, I didn't explain all the reasoning behind my heuristics in #37630. I've updated #42223 (comment), please see the details there. |
|
For merging Assuming there is no overlapped scan (so almost no benefit), and the table has N rows. Previously, both Given that it's hard to estimate the overlapped scan size, the only idea I can think of is to estimate the cost of the predicate. We only do the merge if the predicate is cheap to evaluate: only contains simple comparison and the expression tree size is small. We can define some patterns when the overlapped scan size must be large, e.g. one side has no filter, or the filters in two aggregates are the same. For these cases, we can always apply the merge. |
Sorry, my first comment about the extra project (#42223 (comment)) was confusing. I got that argument, and I agreed with you (#42223 (comment)). But if we decided to merge the queries (based on any heuristics) then the extra project can help to avoid evaluating the filters 2 times in the merged query and actually can decrease data to be shuffled for the aggregation: #42223 (comment). IMO the extra project beween the filter and scan won't prevent the merged condition to be pushed-down to the scan so i don't see any drawbacks of it.
That makes sense to me and I don't have any better idea. Although I still think we can and should check if any filters are pushed down to scans in the original queries. If there are no pushed-down filters in any of the queries or pushed-down filters fully match then we are safe to merge as scans fully overlap. If there is a non-matching pushed-down filter then we can use the suggestged "expression is cheap" heuristics. (By "pushed-down filter" I mean all partition, bucket and data filters in the scan.) |
|
@peter-toth I agree that the extra project can help if we decided to merge. However, the plan pattern becomes complicated. Without the extra project, the merged aggregate is still Also note that, |
Those patterns are already covered in the current code: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 and not touched in my aggregate merge PR: https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR292-R318
Actually, due to the shortcut |
|
@peter-toth Setting aside the previous discussion, after all, the current implementation is the simplest pattern, which is also an advantage that cannot be ignored. If adding a project can bring performance improvement, we can follow up and improve. |
If we want to push this PR for some reason I'm not against it if the above mentioned correctness issue gets fixed (#42223 (comment)). |
Fixed.
I'm also don't like the idea as you said. But it seems the code is different between this PR and #37630. Maybe we should advance these two PRs separately until a shared opportunity is discovered, and then reconstruct both. |
IMO the codes are very similar and the basic algorightm is identical. Both methods are just traversing the plan and comparing nodes if/how they can be merged. BTW this PR still doesn't use the "condition expression cost is low" heuristics that we discussed. The current "isLikelySelective" doesn't make sense (#42223 (comment)) and has nothing to do with expression cost. |
|
I'm not sure if we can reuse https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 directly. If the Maybe a better idea is to do a post-hoc update to extract filters to a Project. |
I agree.
Yes. I have finished it. |
|
ping @cloud-fan @peter-toth Could we continue to carry the work a step forward ? |
|
Sorry, last week was a bit hectic for me. I've already put together a change to #37630 to make it simpler and contain the discussed, but haven't got time to update the PR. I will do it soon. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Some queries contains multiple scalar subquery(aggregation without group by clause) and connected with join. The general form of joined aggregates that can be merged as follows:
For example,
We can optimize this SQL to the form shown below:
If we can merge the filters and aggregates, we can scan data source only once and eliminate the join so as avoid shuffle.
This PR also supports eliminate nested Join, please refer to: https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q28.sql
Obviously, this change will improve the performance.
This PR also reuse some functions come from
MergeScalarSubqueries.This PR also add some
TreePatternfor easy to check the cost of predicate.Why are the changes needed?
Improve the performance for the case show above.
Does this PR introduce any user-facing change?
'No'.
New feature.
How was this patch tested?
TPC-DS data size: 2TB.
This improvement is valid for TPC-DS q28 and no regression for other test cases.
According to the micro benchmark, this improvement is worse than before if the filter has almost no selectivity.