Skip to content

Conversation

@beliefer
Copy link
Contributor

@beliefer beliefer commented Jul 29, 2023

What changes were proposed in this pull request?

Some queries contains multiple scalar subquery(aggregation without group by clause) and connected with join. The general form of joined aggregates that can be merged as follows:

<aggregation function> ::=
  SUM | AVG | MAX | ...

<aggregation subquery> ::=
  SELECT
    <aggregation function>(...)[ , <aggregation function>(...)[ , ...]]
  FROM [tab | query]

<joined aggregation > ::=
  SELECT *
  FROM (
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
      ...
    <aggregation subquery> [INNER | CROSS | LEFT | RIGHT | FULL OUTER] JOIN
    <aggregation subquery>
  )

For example,

SELECT *
FROM (SELECT
  avg(power) avg_power,
  count(power) count_power,
  count(DISTINCT power) count_distinct_power
FROM data
WHERE country = "USA"
  AND (id BETWEEN 1 AND 3
  OR city = "Berkeley"
  OR name = "Xiao")) B1,
  (SELECT
    avg(power) avg_power,
    count(power) count_power,
    count(DISTINCT power) count_distinct_power
  FROM data
  WHERE country = "China"
    AND (id BETWEEN 4 AND 5
    OR city = "Hangzhou"
    OR name = "Wenchen")) B2

We can optimize this SQL to the form shown below:

SELECT
  avg(power) avg_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  count(power) count_power FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  count(DISTINCT power) FILTER (country = "USA" AND (id BETWEEN 1 AND 3 OR city = "Berkeley" OR name = "Xiao")),
  avg(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen")),
  count(power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen")),
  count(DISTINCT power) FILTER (country = "China" AND (id BETWEEN 4 AND 5 OR city = "Hangzhou" OR name = "Wenchen"))
FROM data
WHERE
(country = "USA"
  AND (id BETWEEN 1 AND 3
  OR city = "Berkeley"
  OR name = "Xiao")) OR
(country = "China"
    AND (id BETWEEN 4 AND 5
    OR city = "Hangzhou"
    OR name = "Wenchen"))

If we can merge the filters and aggregates, we can scan data source only once and eliminate the join so as avoid shuffle.

This PR also supports eliminate nested Join, please refer to: https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q28.sql

Obviously, this change will improve the performance.

This PR also reuse some functions come from MergeScalarSubqueries.

This PR also add some TreePattern for easy to check the cost of predicate.

Why are the changes needed?

Improve the performance for the case show above.

Does this PR introduce any user-facing change?

'No'.
New feature.

How was this patch tested?

  1. new test cases
  2. new micro benchmark.
Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
filter is not defined, CombineJoinedAggregates: false            730            819          69         28.7          34.8       1.0X
filter is not defined, CombineJoinedAggregates: true             618            632          14         33.9          29.5       1.2X
step is 1000000, CombineJoinedAggregates: false                  572            590          20         36.7          27.3       1.3X
step is 1000000, CombineJoinedAggregates: true)                  769            794          21         27.3          36.6       1.0X
step is 100000, CombineJoinedAggregates: false                   350            370          26         59.9          16.7       2.1X
step is 100000, CombineJoinedAggregates: true)                   231            241          10         90.7          11.0       3.2X
step is 10000, CombineJoinedAggregates: false                    314            340          26         66.8          15.0       2.3X
step is 10000, CombineJoinedAggregates: true)                    171            182           9        122.5           8.2       4.3X
step is 1000, CombineJoinedAggregates: false                     303            337          32         69.3          14.4       2.4X
step is 1000, CombineJoinedAggregates: true)                     162            171           9        129.4           7.7       4.5X
step is 100, CombineJoinedAggregates: false                      300            316          27         70.0          14.3       2.4X
step is 100, CombineJoinedAggregates: true)                      160            169           9        131.3           7.6       4.6X
step is 10, CombineJoinedAggregates: false                       297            325          33         70.6          14.2       2.5X
step is 10, CombineJoinedAggregates: true)                       170            203          36        123.5           8.1       4.3X
step is 1, CombineJoinedAggregates: false                        328            352          17         64.0          15.6       2.2X
step is 1, CombineJoinedAggregates: true)                        140            148           7        149.3           6.7       5.2X
Benchmark CombineJoinedAggregates:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
Tree node number < 1, CombineJoinedAggregates: true)             398            503         109         52.7          19.0       1.0X
Tree node number < 9, CombineJoinedAggregates: true)             394            432          31         53.2          18.8       1.0X
Tree node number < 19, CombineJoinedAggregates: true)            399            427          47         52.6          19.0       1.0X
Tree node number < 29, CombineJoinedAggregates: true)            434            479         100         48.3          20.7       0.9X
Tree node number < 39, CombineJoinedAggregates: true)            480            499          24         43.7          22.9       0.8X
  1. manual test on TPC-DS
    TPC-DS data size: 2TB.
    This improvement is valid for TPC-DS q28 and no regression for other test cases.
TPC-DS Query Before(Seconds) After(Seconds) Speedup(Percent)
q28 109.665 43.938 249.59%

According to the micro benchmark, this improvement is worse than before if the filter has almost no selectivity.

@beliefer
Copy link
Contributor Author

ping @cloud-fan @viirya cc @MaxGekk @gengliangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CombineJoinedAggregates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must not exists filter what does it mean? do you mean contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The comment is confused. I mean is not support aggregate expression with filter.
I will update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think transform up is better. For a multi-join query, we can eliminate the leaf joins first, and transform up to eliminate all joins if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to handle Join here. If the rule runs bottom up, leaf joins become Aggregate and then we can match simple join again.

@peter-toth
Copy link
Contributor

peter-toth commented Jul 31, 2023

Hi @beliefer,
This is a nice optimization and awsome performance improvement to Q28, but I'm not sure the implementation is done the right way:

  • You probably know that MergeScalarSubqueries does a very similar plan merging in case of scalar subqueries so I feel this PR shouldn't reimplement the logic of MergeScalarSubqueries.tryMergePlans() but reuse the already available code and add the necessary improvements to it if needed. The plan merging code can also be extracted to a common place to share between the rules.
  • I guess probably the main reason why you reimplemented merge logic is to be able to propagate diferent filter conditions from Filter nodes up into Aggregate nodes as that is required for Q28, but currently that feature is missing from MergeScalarSubqueries.tryMergePlans(). IMO that improvement should be added to the existing code because Q9 could also benefit from that. Please note that I've already tried to add that in SPARK-40193 / [SPARK-40193][SQL] Merge subquery plans with different filters #37630.
    Unfortunately I think this feature can be tricky as there might be cases when merging queries can introduce prformance degradation. E.g. such bad case is when we merge SELECT sum(a) FROM t WHERE p = 1 and SELECT sum(b) FROM t WHERE p = 2 into SELECT sum(a) FILTER (p = 1), sum(b) FILTER (p = 2) FROM t WHERE p = 1 OR p = 2 and p is a partitioning column as we need to process more, but we don't scan less data in the merged query. To avoid that I used a trick in [SPARK-40193][SQL] Merge subquery plans with different filters #37630 to peek into the physical plan to check that only pushed data filters differ but partitioning and bucketing filters match. That trick made the implementation a bit complex, but we don't need to stick to that, we could also disable filter propagation during plan merging by default as this PR does.
    Anyway, my point is that probably the feature should be added to a common place (MergeScalarSubqueries.tryMergePlans()).

So I would suggest:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you are trying to merge aggregates in sequential order only. What if the first can't be merged with the rest, but the rest can be merged together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule can merge any continuous part of to be merged aggregates in sequential order.
If the first can't be merged with the rest, the rest can still be merged together.

Copy link
Contributor

@peter-toth peter-toth Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it now thanks! Maybe we could try merging an aggregate with all the previous merged and non-merged ones.

Copy link
Contributor

@peter-toth peter-toth Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm not sure how can you replace the Join to the last merged aggregate only? Nevermind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is correct as you simply ignore the propagated filters to here, but I can imagine adjacent Filter nodes in a plan...
Please check #37630 how it handles this case, but as I mentioned IMO the plan merge code shouldn't be duplicated to here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this PR support some of the most basic forms do not need to propagate filters here temporarily.

Copy link
Contributor

@peter-toth peter-toth Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this case branch seems to behave incorrectly when you have adjacent filters in both plans.
E.g. if you have:

SELECT avg(a)
FROM (
  SELECT * FROM t WHERE c1 = 1
) t
WHERE c2 = 1

and

SELECT sum(b)
FROM (
  SELECT * FROM t WHERE c1 = 2
) t
WHERE c2 = 2

and PushDownPredicates is disabled then you are in that situation. When you are merging the parent Filter nodes you shouldn't ignore the propagated filters from child Filter nodes.

Actually #37630 handles that case and many others and that's why I think splitting this PR into 2 parts would be better:

  • This rule would be still useful if you used the existing MergeScalarSubqueries.tryMergePlans() to merge plans. It couldn't deal with Q28 immediately, but could deal with simpler aggregates without differing filters.
  • [SPARK-40193][SQL] Merge subquery plans with different filters #37630 could be a good base for implementing the filter merging feature. We could simplify that PR and remove the tricky physical plan checking part and introduce a simple boolean flag to enable/disable the filter merging feature like this PR does. In this case both Q9 and Q28 could benefit from the new feature...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth Thank you for the point. As you said, this situation is only worth considering when PushDownPredicates is disabled. AFAIK, PushDownPredicates is enabled and we can't disable it. This PR placed CombineJoinedAggregates after PushDownPredicates in Optimizer list, so we could avoid to treat the scene.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added test case test("join two side are Aggregates with subquery")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the user can disable PushDownPredicates with spark.sql.optimizer.excludedRules and spark.sql.adaptive.optimizer.excludedRules configs. And there might be other cases when we end up in that situation, that was just and example...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if users arbitrarily exclude the optimizer, then I believe most rules will execute improperly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, otimizer rules might not make plans any better if prerequisites aren't met, but a rule shouldn't create incorrect plan in any circumstances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got you mean now. Let's fix it.

@beliefer
Copy link
Contributor Author

beliefer commented Jul 31, 2023

@peter-toth Thank you for the review.
For your first point, we surely could reuse some function if there are some code is similar to SPARK-40193 / #37630. If #37630 could be merged first, I will reuse them. Otherwise, this PR merged first, I will create follow up PR for reuse.

For your second point, I'm OK to pass Filter nodes up into Aggregate nodes if the tryMergePlans can be reused. The reason about prformance degradation is Filter haven't better or higher selective. Please refer the benchmark in the description. So I added the config and the default value is false.

Your suggestion is welcome. But I think what's the chance to reuse them? If #37630 merged first, I will reuse them directly. Otherwise, I will create follow up PR to fix.

/**
* The helper class used to merge scalar subqueries.
*/
trait MergeScalarSubqueriesHelper {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth I create this trait used to share the common functions.

@beliefer
Copy link
Contributor Author

beliefer commented Aug 1, 2023

@cloud-fan The CI failure is unrelated.

@cloud-fan
Copy link
Contributor

Instead of showing an example query, can you define the general form of joined aggregates that can be merged?

ne.transform {
case ae @ AggregateExpression(_, _, _, filterOpt, _) =>
val newFilter = filterOpt.map { filter =>
And(filter, propagatedFilter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old query evaluates Filter first, we should keep it and here it should be And(propagatedFilter, filter)

Copy link
Contributor Author

@beliefer beliefer Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Seq(Inner, Cross, LeftOuter, RightOuter, FullOuter).contains(joinType)

// Merge the multiple Aggregates.
private def mergePlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return type is a bit complicated, let's add comment to explain


(Aggregate(Seq.empty, mergedAggregateExprs, newChild), AttributeMap.empty, Seq.empty)
}
case (lp: Project, rp: Project) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have tests that hit this branch? Ideally Aggregate and Project will be merged.

Copy link
Contributor Author

@beliefer beliefer Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This test case has already been covered. Please see test("join side is not Aggregate")

* "benchmarks/CombineJoinedAggregatesBenchmark-results.txt".
* }}}
*/
object CombineJoinedAggregatesBenchmark extends SqlBasedBenchmark {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need to add microbenchmark for optimizer features. The plan change is sufficient to show the benefits. You can keep the benchmark locally to show the perf numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

*/
trait MergeScalarSubqueriesHelper {

// If 2 plans are identical return the attribute mapping from the new to the cached version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does cached version mean? I think this method will use the left query as the main query and return the attribute map to rewrite the right query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. The comment is not correct. Let's updated it.

* every [[Join]] are [[Aggregate]]s.
*
* Note: this rule doesn't following cases:
* 1. One of the to be merged two [[Aggregate]]s with child [[Filter]] and the other one is not.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about it, I think what really matters is the predicate selectivity. No filter means an always true predicate.

To be safe, I think we should reuse some code of DPP that determines selective predicates, and only merge aggregates if they both have selective predicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

Copy link
Contributor

@peter-toth peter-toth Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about it, I think what really matters is the predicate selectivity. No filter means an always true predicate.

To be safe, I think we should reuse some code of DPP that determines selective predicates, and only merge aggregates if they both have selective predicates.

Is that so? I think the performance gain of merging queries comes from the fact that we don't need to scan the same data 2 times but we can calculate the 2 aggregates in 1 run.
E.g. If we have 2 queries:

SELECT sum(a) FROM t WHERE <condition 1>

and

SELECT sum(b) FROM t WHERE <condition 2>

and we want to decide when merging the queries into

SELECT
  sum(a) FILTER (WHERE <condition 1>),
  sum(b) FILTER (WHERE <condition 2>)
FROM t
WHERE <condition 1> OR <condition 2>

make sense then I would say:

  • if the conditions are true literals (or they doesn't exist at all)
  • or the conditions match

then it definitely does make sense to merge them.
But in the first case the conditions are not selective at all and in the second case condition selectivity doesn't matter. (Actually these are the 2 cases that are already covered by MergeScalarSubqueries.tryMergePlans(): https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L275-L286.)

Now, if the conditions differ then it isn't trivial to calculate if scans overlap, but I think if both conditions are highly selective then we have less chance for scans to overlap, don't we?

BTW, in my #37630 I used a different heuristics to disable merging of aggregates with different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we disable this feature by default then do we need to define any heuristics? Can't we just let the user enable the feature if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense then I would say:

  • if the conditions are true literals (or they doesn't exist at all)
  • or the conditions match

I agree the opinion. This PR support these two cases.

  1. The predicate of two side are true or they doesn't exist at all.
  2. The predicate of two side are selective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging can always "inflate" data in case of aggregate function columns are different to filter condition columns as those condition columns need to go up to aggregate in the merged query while in the original queries they were pruned after filter. In this case if we alias the conditions to small boolean columns then shuffle data size decrease and less "stressful".

The worst case you are referring to is when aggregate function columns are the superset of filter condition columns, but even in this case only small boolean columns are added and the gain is that we don't need to evaluate the filters twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take an example, condition 1 is flag is true and condition 2 is flag is false, put these conditions into project list will add two boolean columns.

Copy link
Contributor

@peter-toth peter-toth Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a conditions is f(c) and c is not used in the aggregate function (e.g. it is sum(a)) then the project also removes c besides adding a new boolean column (we can write sum(a) FILTER (WHERE <boolean column>) instead of sum(a) FILTER (WHERE f(c))) .
If f(c1, c2, ..., c10) is the condition and none of these are in the aggregate function then the project removes 10 columns and adds only 1 boolean.
What you are reffering to is the worst case, but IMO on average, the extra project helps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases, the f(c) are different. e.g. f1(c), f2(c) and so on.

Copy link
Contributor

@peter-toth peter-toth Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok. It is not the conditions (f(c)s) relation to each other what matters, but the filter columns set (c1 ... c10) relation to aggregate function column set (a).

@beliefer
Copy link
Contributor Author

beliefer commented Aug 2, 2023

BTW, in my #37630 I used a different heuristics to disable merging of aggregates with > different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.

@peter-toth Could you tell me more? I can't found the treat for partitioning or bucketing columns.

@peter-toth
Copy link
Contributor

peter-toth commented Aug 2, 2023

BTW, in my #37630 I used a different heuristics to disable merging of aggregates with > different filter conditions. If the conditions contain any partitioning or bucketing columns then aggregates are not merged.

@peter-toth Could you tell me more? I can't found the treat for partitioning or bucketing columns.

That heuristics made the whole PR complex. You can follow the logic of ScanCheck object and the case case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) => (https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR277-R290) in tryMergePlans(). That case actually peeks into the physical plan to check if only pushed-down data filters differ (partitioning and bucketing filters do match).

BTW, I'm not saying that it is the right heuristics to decide if we should merge aggregates with different filters, it is just the one I was able to come up with...
Anyways, expecting highly selective predicates seems a bit counter intuitive to me.
And as I mentioned I'm also fine with disabling the feature with a config by default and let the users enable it for some of queries that benefit from it. If we decide to go that way it would simplify a lot my #37630 PR as well.

@peter-toth
Copy link
Contributor

peter-toth commented Aug 2, 2023

@beliefer
Copy link
Contributor Author

beliefer commented Aug 2, 2023

         case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) =>
           val (newScanToCompare, cachedScanToCompare) =
             if (conf.getConf(SQLConf.PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS)) {
               (newScan.copy(dataFilters = Seq.empty), cachedScan.copy(dataFilters = Seq.empty))
             } else {
               (newScan, cachedScan)
             }
           if (newScanToCompare.canonicalized == cachedScanToCompare.canonicalized) {
             // Physical plan is mergeable, but we still need to finish the logical merge to
             // propagate the filters
             tryMergePlans(newPlan, cachedPlan, DONE)
           } else {
             None
           }

I think the above code is not needed. Generally, we concat the predicates with OR, the origin filters still could be pushed down to file sources.

@peter-toth
Copy link
Contributor

         case (CHECKING, FileSourceScanPlan(_, newScan), FileSourceScanPlan(_, cachedScan)) =>
           val (newScanToCompare, cachedScanToCompare) =
             if (conf.getConf(SQLConf.PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS)) {
               (newScan.copy(dataFilters = Seq.empty), cachedScan.copy(dataFilters = Seq.empty))
             } else {
               (newScan, cachedScan)
             }
           if (newScanToCompare.canonicalized == cachedScanToCompare.canonicalized) {
             // Physical plan is mergeable, but we still need to finish the logical merge to
             // propagate the filters
             tryMergePlans(newPlan, cachedPlan, DONE)
           } else {
             None
           }

I think the above code is not needed. Generally, we concat the predicates with OR, the origin filters still could be pushed down to file sources.

Please comment on the other PR regarding the code of that PR.
But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be more selective in terms what files to scan...

@beliefer
Copy link
Contributor Author

beliefer commented Aug 3, 2023

But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only
FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or
FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be
more selective in terms what files to scan...

In theory, whether it is data filters or partition filters, there is a possibility of data overlap when connected filters with or.
Before merge the filters (e.g. p = 1, p = 2), assume each partition have one file, so we need to read two partition files.
After merge the two filters, we still need to read two partition files.
I think the overhead of scan partition files is the same. the different is the filter need to calculates more. e.g. p = 1 also need to treat the data come from p = 2.
So, personally, I think the overhead of calculate is similar, no matter which filter is.

The main reason for filter merging is the amount of overlapping data. For example, F1 obtains 100 rows of data, and F2 obtains 50 rows of data. If the 100 rows and 50 rows completely overlap, this is the best situation. F1 on Aggregate1 still processes 100 rows of data, while F2 on Aggregate2 processes an additional 50 rows, resulting in a total of 100 rows of data.
The worst case scenario is that the two do not overlap at all. So F1 on Aggregate1 needs to process an additional 50 rows, a total of 150 rows; F2 on Aggregate2 processes an additional 100 rows, totaling 150 rows.

@peter-toth
Copy link
Contributor

But my point is that it doesn't matter how the filter looks like (is it an OR condition or not). I enabled merging if only
FileSourceScanExec.dataFilters differ between the 2 scans. If FileSourceScanExec.partitionFilters or
FileSourceScanExec.optionalBucketSet differ then merging is disabled because partitioning and bucketing filters can be
more selective in terms what files to scan...

In theory, whether it is data filters or partition filters, there is a possibility of data overlap when connected filters with or. Before merge the filters (e.g. p = 1, p = 2), assume each partition have one file, so we need to read two partition files. After merge the two filters, we still need to read two partition files. I think the overhead of scan partition files is the same. the different is the filter need to calculates more. e.g. p = 1 also need to treat the data come from p = 2. So, personally, I think the overhead of calculate is similar, no matter which filter is.

The main reason for filter merging is the amount of overlapping data. For example, F1 obtains 100 rows of data, and F2 obtains 50 rows of data. If the 100 rows and 50 rows completely overlap, this is the best situation. F1 on Aggregate1 still processes 100 rows of data, while F2 on Aggregate2 processes an additional 50 rows, resulting in a total of 100 rows of data. The worst case scenario is that the two do not overlap at all. So F1 on Aggregate1 needs to process an additional 50 rows, a total of 150 rows; F2 on Aggregate2 processes an additional 100 rows, totaling 150 rows.

Sorry @beliefer, I didn't explain all the reasoning behind my heuristics in #37630. I've updated #42223 (comment), please see the details there.

@cloud-fan
Copy link
Contributor

For merging func1(...) ... WHERE cond1 and func2(...) ... WHERE cond2, we got

func1(...) FILTER cond1, func2(...) FILTER cond2 ... WHERE cond1 OR cond2

Assuming there is no overlapped scan (so almost no benefit), and the table has N rows. Previously, both cond1 and cond2 get evaluated at most N times (the scan gets prunned). If they are partition filters, then they are evaluated 0 times. Now, they get evaluated at most 2N times. It's likely more than 2 times than before as we prune less data from the scan. The worst case is partition filters. Before, they get evaluated 0 times, now they get evaluated (numRows matching cond1 OR cond2) times, plus some extra evaluation in the aggregate filter. I don't think putting the predicates in a Project helps as the problem is from scan prunning.

Given that it's hard to estimate the overlapped scan size, the only idea I can think of is to estimate the cost of the predicate. We only do the merge if the predicate is cheap to evaluate: only contains simple comparison and the expression tree size is small.

We can define some patterns when the overlapped scan size must be large, e.g. one side has no filter, or the filters in two aggregates are the same. For these cases, we can always apply the merge.

@peter-toth
Copy link
Contributor

peter-toth commented Aug 4, 2023

I don't think putting the predicates in a Project helps as the problem is from scan prunning.

Sorry, my first comment about the extra project (#42223 (comment)) was confusing. I got that argument, and I agreed with you (#42223 (comment)).

But if we decided to merge the queries (based on any heuristics) then the extra project can help to avoid evaluating the filters 2 times in the merged query and actually can decrease data to be shuffled for the aggregation: #42223 (comment). IMO the extra project beween the filter and scan won't prevent the merged condition to be pushed-down to the scan so i don't see any drawbacks of it.

Given that it's hard to estimate the overlapped scan size, the only idea I can think of is to estimate the cost of the predicate. We only do the merge if the predicate is cheap to evaluate: only contains simple comparison and the expression tree size is small.

That makes sense to me and I don't have any better idea. Although I still think we can and should check if any filters are pushed down to scans in the original queries. If there are no pushed-down filters in any of the queries or pushed-down filters fully match then we are safe to merge as scans fully overlap. If there is a non-matching pushed-down filter then we can use the suggestged "expression is cheap" heuristics. (By "pushed-down filter" I mean all partition, bucket and data filters in the scan.)

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 4, 2023

@peter-toth I agree that the extra project can help if we decided to merge. However, the plan pattern becomes complicated. Without the extra project, the merged aggregate is still Aggregate -> Filter -> Scan. We can just define a rule for merging two aggregates, and it can incrementally merge all joined aggregates. With the extra project, we need to define how to merge Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Project -> Scan, or Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Scan.

Also note that, WHERE cond1 OR cond2 can be better than FROM (SELECT cond1 AS b_col1, cond2 AS b_col2) WHERE b_col1 OR b_col2 , because OR has shortcut and cond2 is not always evaluated for each input row. common subexpression elimination might be a better approach here, if we can make it cross the operator boundary (or whole-stage-codegen can only do it?)

@peter-toth
Copy link
Contributor

peter-toth commented Aug 4, 2023

@peter-toth I agree that the extra project can help if we decided to merge. However, the plan pattern becomes complicated. Without the extra project, the merged aggregate is still Aggregate -> Filter -> Scan. We can just define a rule for merging two aggregates, and it can incrementally merge all joined aggregates. With the extra project, we need to define how to merge Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Project -> Scan, or Aggregate -> Filter -> Project -> Scan + Aggregate -> Filter -> Scan.

Those patterns are already covered in the current code: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 and not touched in my aggregate merge PR: https://github.com/apache/spark/pull/37630/files#diff-3d3aa853c51c01216a7f7307219544a48e8c14fabe1817850e58739208bc406aR292-R318

Also note that, WHERE cond1 OR cond2 can be better than FROM (SELECT cond1 AS b_col1, cond2 AS b_col2) WHERE b_col1 OR b_col2 , because OR has shortcut and cond2 is not always evaluated for each input row. common subexpression elimination might be a better approach here, if we can make it cross the operator boundary (or whole-stage-codegen can only do it?)

I'm nost sure WHERE cond1 OR cond2 is better because up in the aggregate cond2 is always evaluated for all rows.
Sorry, got it now.

Actually, due to the shortcut cond2 might not be evaluated in the filter node when cond1 is true, but up in the aggregate it will be evaluated anyways, won't it?

@beliefer
Copy link
Contributor Author

beliefer commented Aug 4, 2023

@peter-toth Setting aside the previous discussion, after all, the current implementation is the simplest pattern, which is also an advantage that cannot be ignored. If adding a project can bring performance improvement, we can follow up and improve.

@peter-toth
Copy link
Contributor

peter-toth commented Aug 4, 2023

@peter-toth Setting aside the previous discussion, after all, the current implementation is the simplest pattern, which is also an advantage that cannot be ignored. If adding a project can bring performance improvement, we can follow up and improve.

If we want to push this PR for some reason I'm not against it if the above mentioned correctness issue gets fixed (#42223 (comment)).
But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same page about the discussed solution in #42223 (comment) then we should update MergeScalarSubqueries.tryMergePlans() (in #37630) and use the updated common merging logic in this PR.

@beliefer
Copy link
Contributor Author

beliefer commented Aug 5, 2023

If we want to push this PR for some reason I'm not against it if the above mentioned correctness issue gets fixed (#42223 (comment)).

Fixed.

But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some
modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same
page about the discussed solution in #42223 (comment) then we
should update MergeScalarSubqueries.tryMergePlans() (in #37630) and use the
updated common merging logic in this PR.

I'm also don't like the idea as you said. But it seems the code is different between this PR and #37630. Maybe we should advance these two PRs separately until a shared opportunity is discovered, and then reconstruct both.

@peter-toth
Copy link
Contributor

peter-toth commented Aug 5, 2023

But honestly, I don't like the idea that we duplicate the code of MergeScalarSubqueries.tryMergePlans() with some
modifications into this new rule just because we don't want to fix agregate merging there. IMO if we all are on the same
page about the discussed solution in #42223 (comment) then we
should update MergeScalarSubqueries.tryMergePlans() (in #37630) and use the
updated common merging logic in this PR.

I'm also don't like the idea as you said. But it seems the code is different between this PR and #37630. Maybe we should advance these two PRs separately until a shared opportunity is discovered, and then reconstruct both.

IMO the codes are very similar and the basic algorightm is identical. Both methods are just traversing the plan and comparing nodes if/how they can be merged.
I'm off for a week, but once returned I can update #37630 with the discussed and maybe remove the physical scan equality check part. Although that part is useful as in some cases we don't need to use any heuristics to allow merging (#42223 (comment)), it makes my PR complex. Maybe we can add it back later...

BTW this PR still doesn't use the "condition expression cost is low" heuristics that we discussed. The current "isLikelySelective" doesn't make sense (#42223 (comment)) and has nothing to do with expression cost.

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 6, 2023

I'm not sure if we can reuse https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L235-L255 directly. If the Project is not generated by this optimization, it might contain other expensive expressions and we should stop merging.

Maybe a better idea is to do a post-hoc update to extract filters to a Project.

@beliefer
Copy link
Contributor Author

beliefer commented Aug 7, 2023

Although that part is useful as in some cases we don't need to use any heuristics to allow merging
(#42223 (comment)), it makes my PR complex. Maybe we can add it > back later...

I agree.

BTW this PR still doesn't use the "condition expression cost is low" heuristics that we discussed. The current
"isLikelySelective" doesn't make sense (#42223 (comment)) and has
nothing to do with expression cost.

Yes. I have finished it.

@beliefer
Copy link
Contributor Author

ping @cloud-fan @peter-toth Could we continue to carry the work a step forward ?

@peter-toth
Copy link
Contributor

Sorry, last week was a bit hectic for me. I've already put together a change to #37630 to make it simpler and contain the discussed, but haven't got time to update the PR. I will do it soon.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 16, 2024
@beliefer beliefer removed the Stale label Jan 16, 2024
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 26, 2024
@github-actions
Copy link

github-actions bot commented Aug 7, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Aug 7, 2024
@github-actions github-actions bot closed this Aug 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants