Skip to content

Conversation

@mingmwang
Copy link
Contributor

@mingmwang mingmwang commented May 26, 2023

Which issue does this PR close?

Closes #6428
Closes #5808
Closes #6497

Rationale for this change

The existing de-correlation rules DecorrelatePredicateSubquery only support Projection or Distinct as the top level plan in the Subquery and the Filter is the child of the top plan. And the rule ScalarSubqueryToJoin also assumes
the top level plan of the Subquery is a Projection and the child is a Aggregation.

But actually the shape of the Subquery plans can be very flexible and the Filters which include the correlated expressions(out reference columns) can be nested in very deep plans.

This PR re-implement the correlation expressions pull-up process and support as much case as possible no matter the shape of the Subquery plans. It covers almost all the case that the subquery plan can be covered to Left Semi/Left Anti(for In/Exists Subquery) or Left Join or Cross Join(Scalar Subquery).

After this PR, the remaining unsupported Subquery cases:

  1. Uncorrelated Exists Subquery
  2. Correlated In Subquery contains Limit clause or Order By clause(can not pull up correlated expressions)
  3. Correlated Scalar Subquery contains Limit clause or Order By clause(can not pull up correlated expressions)
  4. There is Union in the Subquery plan and the Union's children contain correlated expressions.
  5. The correlated expressions are not in the Filter clause, but in Join conditions, aggregation expressions or window expressions etc.

The above 2), 3), 4), 5) cases can not be converted to simple joins, need add another new rule and use a difference approach to de-correlate them.

Some TPC-DS queries are impacted also and they can generate runnable physical plans now.

What changes are included in this PR?

The alias logic is also changed, we can not simply alias the Subquery Projections or Aggregations output's Expr to some internal alias column like __scalar_value , the Expr might be used by some top level plan, like Having clause or
used as a Join conditions..

Are these changes tested?

Yes. Added serval UTs.
I will add more UT tomorrow to cover the count aggregation cases(there is bug here).

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules labels May 26, 2023
@mingmwang mingmwang changed the title init checkin Support normal aggregations in In/Exists Subquery May 26, 2023
@mingmwang mingmwang changed the title Support normal aggregations in In/Exists Subquery Support wider range of Subquery May 26, 2023
@mingmwang mingmwang requested review from alamb and jackwener May 26, 2023 15:46
@jackwener
Copy link
Member

Thanks @mingmwang .
I'm going away for the weekend and won't have time to review the code.
I'm going to review it next week.

" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't figure out why occur double alias

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The double alias is not caused by the decorrelate rules. It's caused by the other logical optimization rule:

new_plan LeftSemi Join:  Filter: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_3.CAST(t2_id AS Int64) + Int64(1)
  TableScan: t1
  SubqueryAlias: __correlated_sq_3
    Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1), CAST(t2.t2_id AS Int64) + Int64(1)
      TableScan: t2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't figure out why occur double alias

Fixed the alias problem.

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label May 30, 2023
@mingmwang
Copy link
Contributor Author

I added additional logic to handle the count() aggregations in subquery, the logic is quite ugly.
I'm not sure whether there are other good ways to handle this.

@jackwener
Could you please help to check how this SQL was rewritten in Apache Doris?

SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1

@jackwener
Copy link
Member

@mingmwang doris don't support project correlated subquery

@alamb
Copy link
Contributor

alamb commented May 30, 2023

I am planning to focus my review efforts on other parts of DataFusion where I have more to add -- I don't plan to review this PR unless someone think that is important.

Thank you @mingmwang and @jackwener for pushing this forward

@mingmwang
Copy link
Contributor Author

mingmwang commented May 31, 2023

@alamb @jackwener
I'm going to mark this PR as draft, I need more time to think and make the count() bug handling logic more generic. Although the current correlation expression pull up and subquery rewrite is more generic, but the count aggregation handling is ugly.

The work should be related to reordering out joins and aggregations.

@mingmwang mingmwang marked this pull request as draft May 31, 2023 02:25
@mingmwang mingmwang changed the title Support wider range of Subquery Support wider range of Subquery, handle the Count bug May 31, 2023
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 1, 2023
@mingmwang mingmwang marked this pull request as ready for review June 1, 2023 09:53
@liurenjie1024
Copy link
Contributor

Why we don't implement the unnesting arbitrary subquery paper? I think it's state of art.🤔

@mingmwang
Copy link
Contributor Author

mingmwang commented Jun 1, 2023

Why we don't implement the unnesting arbitrary subquery paper? I think it's state of art.🤔

@liurenjie1024

What this PR and the previous PRs I implemented/refactored still belong to the simple Unnesting method, they covers the Predicate(In/Exists) Subquery and Scalar Subquery cases in which the correlated expressions can be pull up and correlation can be converted to out joins or semi/anti joins.

For other more complex cases, they can be de-correlated using the methods mentioned in the unnesting arbitrary subquery paper. I will try to implement it later this year. Why not implement the unnesting arbitrary subquery paper directly is because this method might introduce additional joins compared to the simple unnesting method. The additional join comes from the inner table join with the distinct set(magic set).

You can play with the Hyper web interface(Hyper implemented this unnesting arbitrary subquery paper) and check the plan.

Hyper:
https://hyper-db.de/interface.html#

@mingmwang
Copy link
Contributor Author

mingmwang commented Jun 1, 2023

@mingmwang doris don't support project correlated subquery

@jackwener
Could you please to check this query in Apache Doris ?
select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int

        "Projection: t1.t1_int [t1_int:UInt32;N]",
        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
        "    Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
        "      Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean;N]",
        "        TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
        "        SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
        "          Projection: COUNT(UInt8(1)), t2.t2_id, __always_true [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
        "            Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_id:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
        "              TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",

I think this might also lead to the count bug if not handled correctly even it is the predicate scalar subquery.

@mingmwang
Copy link
Contributor Author

@alamb
Do you having time to review this PR next week?

I think subquery unnesting is important in TPS-DS and OLAP workload, and support wider ranger of subqueries will make DataFusion more competitive with other products.

@alamb
Copy link
Contributor

alamb commented Jun 2, 2023

Do you having time to review this PR next week?

Yes, I will find time. Thank you for the contribution -- I just don't have enough review bandwidth to keep up with everything!

@alamb
Copy link
Contributor

alamb commented Jun 6, 2023

This is on my list for review today

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mingmwang -- I realistically can't review a 3000+ line PR in fine grain detail, but I did review most of the plan changes in this PR and I didn't see any issues with merging this PR

Likewise, I reviewed the code and I found it easy to follow.

In addition, given that a substantial amount of this PR is tests and that a bunch of the tpc-ds queries now run, I think we should merge it in and continue improvements on main.

"Explain [plan_type:Utf8, plan:Utf8]",
" LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.CAST(t2_int AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.t2.t2_int + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these plans actually have fewer CASTs or is it just an improvement in ALIAS generation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just improves the unnecessary ALIAS generation.

"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// TODO infer nullability in the schema has bug
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this tracked by a ticket? It probably would be good to put the link URL in the code do so we don't lose track of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add one.

}

#[tokio::test]
async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of query would be great to move to the https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests/test_files/pg_compat suite so we can compare the results directly to postgres

I manually verified a few of the results, but I didn't do so to all of them.

We can do this move as a follow on PR (or never)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also add a new check for results for the tests changed in this file too?

Previously this query didn't run so could not be checked for results

" Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Subquery: [NULL:Null;N]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this query will run yet -- perhaps we can file a ticket and leave a reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this query can not run now. This is just to verify the logic plan generated is expected.

expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check results here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not add result checking in all the new added UTs. Some UTs are used to verify the plans are expected.
I added result checking for those that might have count bugs.

--------------------Projection: nation.n_nationkey
----------------------Filter: nation.n_name = Utf8("GERMANY")
------------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
------Inner Join: Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using an InnerJoin (with ON) seems better than a CrossJoin followed by a filter 👍

LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't Values also return the first expr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think currently we do not support Values clause inside the subquery.

\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey [o_custkey:Int64]\
\n SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old plan looks straight up wrong, as it lost the + 1. This new plan looks much better

Comment on lines +587 to +588
\n Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this plan is not as good (to do a left join and then a filter rather than doing the filter as part of the join condition) -- perhaps other subsequent passes improve the filtering 🤔

But given it is LEFT JOIN and the __scalar_sq_1.MAX(orders.o_custkey) refers to the non-preserved side, I don't think it can be done after the join

Copy link
Contributor Author

@mingmwang mingmwang Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. you are right. We can leverage other rules to further optimize the plan.
And Optimizing the Scalar Subquery to Left Join is more general, no matter the Scalar Subquery is in Projection exprs or in the Filter exprs.

@alamb
Copy link
Contributor

alamb commented Jun 6, 2023

I took the liberty of merging this branch to main to resolve a conflict

@mingmwang
Copy link
Contributor Author

@alamb
I will resolve the conflicts tomorrow.

@alamb
Copy link
Contributor

alamb commented Jun 12, 2023

I will resolve the conflicts tomorrow.

I believe #6642 may also conflict -- I will try and review that one today and get it merged too

@mingmwang
Copy link
Contributor Author

I will resolve the conflicts tomorrow.

I believe #6642 may also conflict -- I will try and review that one today and get it merged too

Conflicts resolved.

@alamb alamb merged commit fd444b4 into apache:main Jun 13, 2023
@alamb
Copy link
Contributor

alamb commented Jun 13, 2023

Thanks again @mingmwang

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)

Projects

None yet

4 participants