-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Support wider range of Subquery, handle the Count bug #6457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks @mingmwang . |
datafusion/core/tests/sql/joins.rs
Outdated
| " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
| " SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
| " Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
| " Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't figure out why occur double alias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The double alias is not caused by the decorrelate rules. It's caused by the other logical optimization rule:
new_plan LeftSemi Join: Filter: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_3.CAST(t2_id AS Int64) + Int64(1)
TableScan: t1
SubqueryAlias: __correlated_sq_3
Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1), CAST(t2.t2_id AS Int64) + Int64(1)
TableScan: t2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't figure out why occur double alias
Fixed the alias problem.
|
I added additional logic to handle the @jackwener
|
|
@mingmwang doris don't support |
|
I am planning to focus my review efforts on other parts of DataFusion where I have more to add -- I don't plan to review this PR unless someone think that is important. Thank you @mingmwang and @jackwener for pushing this forward |
|
@alamb @jackwener The work should be related to reordering |
|
Why we don't implement the unnesting arbitrary subquery paper? I think it's state of art.🤔 |
What this PR and the previous PRs I implemented/refactored still belong to the simple Unnesting method, they covers the Predicate(In/Exists) Subquery and Scalar Subquery cases in which the correlated expressions can be pull up and correlation can be converted to out joins or semi/anti joins. For other more complex cases, they can be de-correlated using the methods mentioned in the unnesting arbitrary subquery paper. I will try to implement it later this year. Why not implement the unnesting arbitrary subquery paper directly is because this method might introduce additional joins compared to the simple unnesting method. The additional join comes from the inner table join with the distinct set(magic set). You can play with the |
@jackwener I think this might also lead to the |
|
@alamb I think subquery unnesting is important in TPS-DS and OLAP workload, and support wider ranger of subqueries will make DataFusion more competitive with other products. |
Yes, I will find time. Thank you for the contribution -- I just don't have enough review bandwidth to keep up with everything! |
|
This is on my list for review today |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mingmwang -- I realistically can't review a 3000+ line PR in fine grain detail, but I did review most of the plan changes in this PR and I didn't see any issues with merging this PR
Likewise, I reviewed the code and I found it easy to follow.
In addition, given that a substantial amount of this PR is tests and that a bunch of the tpc-ds queries now run, I think we should merge it in and continue improvements on main.
datafusion/core/tests/sql/joins.rs
Outdated
| "Explain [plan_type:Utf8, plan:Utf8]", | ||
| " LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.CAST(t2_int AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
| " LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
| " LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.t2.t2_int + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these plans actually have fewer CASTs or is it just an improvement in ALIAS generation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just improves the unnecessary ALIAS generation.
| "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
| ); | ||
|
|
||
| // TODO infer nullability in the schema has bug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this tracked by a ticket? It probably would be good to put the link URL in the code do so we don't lose track of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add one.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this kind of query would be great to move to the https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests/test_files/pg_compat suite so we can compare the results directly to postgres
I manually verified a few of the results, but I didn't do so to all of them.
We can do this move as a follow on PR (or never)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
| ); | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also add a new check for results for the tests changed in this file too?
Previously this query didn't run so could not be checked for results
| " Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | ||
| " Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | ||
| " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | ||
| " Subquery: [NULL:Null;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this query will run yet -- perhaps we can file a ticket and leave a reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this query can not run now. This is just to verify the logic plan generated is expected.
| expected, actual, | ||
| "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
| ); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could check results here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not add result checking in all the new added UTs. Some UTs are used to verify the plans are expected.
I added result checking for those that might have count bugs.
| --------------------Projection: nation.n_nationkey | ||
| ----------------------Filter: nation.n_name = Utf8("GERMANY") | ||
| ------------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")] | ||
| ------Inner Join: Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using an InnerJoin (with ON) seems better than a CrossJoin followed by a filter 👍
| LogicalPlan::EmptyRelation(_) | ||
| | LogicalPlan::Prepare(_) | ||
| | LogicalPlan::Statement(_) | ||
| | LogicalPlan::Values(_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't Values also return the first expr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think currently we do not support Values clause inside the subquery.
| \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ | ||
| \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ | ||
| \n Projection: orders.o_custkey [o_custkey:Int64]\ | ||
| \n SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old plan looks straight up wrong, as it lost the + 1. This new plan looks much better
| \n Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\ | ||
| \n Left Join: [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like this plan is not as good (to do a left join and then a filter rather than doing the filter as part of the join condition) -- perhaps other subsequent passes improve the filtering 🤔
But given it is LEFT JOIN and the __scalar_sq_1.MAX(orders.o_custkey) refers to the non-preserved side, I don't think it can be done after the join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. you are right. We can leverage other rules to further optimize the plan.
And Optimizing the Scalar Subquery to Left Join is more general, no matter the Scalar Subquery is in Projection exprs or in the Filter exprs.
|
I took the liberty of merging this branch to |
|
@alamb |
I believe #6642 may also conflict -- I will try and review that one today and get it merged too |
Conflicts resolved. |
|
Thanks again @mingmwang |
Which issue does this PR close?
Closes #6428
Closes #5808
Closes #6497
Rationale for this change
The existing de-correlation rules
DecorrelatePredicateSubqueryonly supportProjectionorDistinctas the top level plan in the Subquery and theFilteris the child of the top plan. And the ruleScalarSubqueryToJoinalso assumesthe top level plan of the Subquery is a
Projectionand the child is aAggregation.But actually the shape of the Subquery plans can be very flexible and the
Filters which include the correlated expressions(out reference columns) can be nested in very deep plans.This PR re-implement the correlation expressions pull-up process and support as much case as possible no matter the shape of the Subquery plans. It covers almost all the case that the subquery plan can be covered to
Left Semi/Left Anti(for In/Exists Subquery) orLeft JoinorCross Join(Scalar Subquery).After this PR, the remaining unsupported Subquery cases:
Exists SubqueryIn SubquerycontainsLimitclause orOrder Byclause(can not pull up correlated expressions)Scalar SubquerycontainsLimitclause orOrder Byclause(can not pull up correlated expressions)Unionin the Subquery plan and theUnion's children contain correlated expressions.The above 2), 3), 4), 5) cases can not be converted to simple joins, need add another new rule and use a difference approach to de-correlate them.
Some TPC-DS queries are impacted also and they can generate runnable physical plans now.
What changes are included in this PR?
The alias logic is also changed, we can not simply alias the Subquery
Projections orAggregations output'sExprto some internal alias column like__scalar_value, the Expr might be used by some top level plan, likeHavingclause orused as a Join conditions..
Are these changes tested?
Yes. Added serval UTs.
I will add more UT tomorrow to cover the
countaggregation cases(there is bug here).Are there any user-facing changes?