-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18874][SQL] First phase: Deferring the correlated predicate pull up to Optimizer phase #16954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #72992 has finished for PR 16954 at commit
|
|
cc @hvanhovell I did the internal review. It is ready for you to review it. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test, the query is not a valid query as its missing group by. It used to pass before as we implicitly added r.c into aggregate grouping as part of pulling up correlated predicates in analyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is mostly copied from Analyzer. The only difference is since validation is done by analyzer , we only process Filter, Project and Aggregate here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan example is available in Code Changes in section 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should create a special InSubQuery expression. This looks like a lot of work for what we are doing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following definition is moved from PredicateSubquery.hasPredicateSubquery with the removing of the PredicateSubquery in the first case pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following definition is moved from PredicateSubquery.hasNullAwarePredicateWithinNot with a code change to reflect the new representation of IN subquery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following definition is moved from a nested definition of the same name in ResovleSubquery.pullOuterCorrelatedPredicates in Analyzer.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following three definitions of stripOuterReference(s) are taken from a segment of code embedded in ResolveSubquery.rewriteSubQuery (Analyzer.scala).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following definition is moved unchanged from a nested definition of the same name in ResolveSubquery.pullOutCorrelatedPredicates (Analyzer.scala)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the de-duplication of sub plan happens in optimizer when we actually pull up the correlated predicates. Thus the project case is simplified.
|
@hvanhovell Hi Herman, was wondering if you had some time to look into this PR ? Please let me know your thoughts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took an initial pass (not quite there yet). Most of it is in order. I left a few comments.
I will take another pass this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: spacing is of here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thanks.. will change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move this into CheckAnalysis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also use foreachUp instead of transformUp for the tree traversal in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell The code here validates the correlated references as well as collects them to rewrite the outer plan to record the outer references. You are suggesting to move the "check" portion of checkAndGetOuter... () to checkAnalysis ? I will change to use foreachUp. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then leave as it is. I thought it was only doing validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be somewhat expensive. We could also use something that terminates early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am all for decent documentation, but do you think we can find a somewhat smaller example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell I will work on a smaller example. I think we don't need the subquery expression in the left plan. That should cut down the plan size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal You probably are already on the right track that one of the subquery expression is not required. I guess either T1 intersect T1 where exists (T2) or a self-join scenario like T1, T1 Tx where exists (T2 where <Q>.col = T2.col) with <Q> be T1 or Tx should also do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: ArrayBuffer.empty[Seq[Expression]] should also work. Why is it useful to collect sequences of expressions instead of just expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thank you. I will change to use ArrayBuffer[Expression] instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only works with aggregates that are already in the Aggregate operator, this seems like a regression. What does Hive do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell At the time of executing this rule, the aggregate expressions from the subquery plan are already pushed down to the Aggregate operator through ResolveAggregateFunctions. Here we are just updating the outer references in the subquery plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, awesome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems weird. Is this also the current behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Here is where i saw the code that handles the promotion between date and timestamp types. code
Please let me know if i missed something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we factor that code out then? Now we have the same logic in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thanks!!. I had tried to do this before as well as this came up during the internal review. I have made another try. Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok please do this in a follow-up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O wait, you have changed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this is all done to get a better error message right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Actually previously checkInputDataTypes() called from checkAnalysis never had to deal with In subquery expressions as it gets rewritten to PredicateSubquery. With the change in this PR we now see the original IN subquery expressions and thus we need to make sure the types are same between LHS and RHS of the IN subquery expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure this is wrong. This will work on the following expression: NOT(a) AND b IN(SELECT q FROM t)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Actually I copied this function from existing PredicateSubquery.hasNullAwarePredicateWithinNot. I also found it a little odd that we are doing a e.find .. but was afraid to change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell I actually had debugged this before and had completely forgotten about it. The case you mention actually works ok because of the way we invoke this functions. Please see the caller at - code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell I have refactored the code a bit and i hope, this function is bit clearer to follow. I have moved the code from its caller in checkAnalysis to this function. Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use foreachDown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Actually herman, in this place, we are actually doing some intermediary transformation (even though we are not returning the transformed expression). After striping the outer references from the AggregateExpression, we are returning the newExpr. Example: min(outer(b))
when we hit 'min', we strip out the outer references and return min(b) to the next transformation so that we don't encounter outer(b) in the next pass and consider it an outer expression. May be there is a better way to achieve this :-)
|
@hvanhovell Hello Herman, I have addressed all your comments |
|
Test build #73646 has finished for PR 16954 at commit
|
|
Test build #73712 has finished for PR 16954 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: s" -> "
and also move the space from the beginning of the second line to the end of the first line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left hand side -> left hand side (LHS)
right hand side -> right hand side (RHS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lhs -> LHS. Please correct all the similar cases in comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment to explain what this condition means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use case (name, value) => instead of pair
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case (p, In(e, Seq(ListQuery(sub, conditions, _))))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case (p, Not(In(e, Seq(ListQuery(sub, conditions, _))))) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case Exists(sub, conditions, _)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case In(value, Seq(ListQuery(sub, conditions, _)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getValueExpression(e) -> getValueExpression(value)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newplan -> newPlan
newcond -> newCond
val (newPlan, newCond) = if (outer.nonEmpty) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case ScalarSubquery(sub, children, exprId) if children.nonEmpty =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want, you also can use cond to replace children
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case Exists(sub, children, exprId) if children.nonEmpty =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case ListQuery(sub, _, exprId) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check this in the guard of the rule. This currently throws a very hard to understand error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use orElse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better if you cast on the castedLhs here:
val newLhs = castedLhs match {
case Seq(lhs) => lhs
case _ =>
CreateNamedStruct(cns.nameExprs.zip(castedLhs).flatMap {
case (name, value) => Seq(name, value)
})
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Can you please double check the above code ? I am unable to compile the code as cns is not defined. In this block, i was looking at the original left hand side expression to see if its a named struct and if so i construct a named struct back. castedLhs would always match a Seq(lhs) , no ? Please let me know if i am missing something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I never compile these snippets. You have a point there. We could just use CreateStruct (since we really don't care about the name). So that would look something like this (again not compiled):
val newLhs = castedLhs match {
case Seq(lhs) => lhs
case _ => CreateStruct(castedLhs)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thanks a lot. You are right, we don't care about the names. This looks much better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to display the offending columns. Now the user has to find the offending columns by herself/himself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell The new error message looks like following. Does this look okay to you ?
Error in query: cannot resolve '(named_struct('c1', at1.`c1`, 'c2', at1.`c2`) IN (listquery()))' due to data type mismatch:
The data type of one or more elements in the left hand side of an IN subquery
is not compatible with the data type of the output of the subquery
Mismatched columns:
[(at1.`c1`:decimal(10,0), at2.`c1`:timestamp), (at1.`c2`:timestamp, at2.`c2`:decimal(10,0))]
Left side:
[decimal(10,0), timestamp].
Right side:
[timestamp, decimal(10,0)].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use catalog strings please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still looks fishy to me. It is also in the original code base, so we don't have to fix this now. Can you open a JIRA to track this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Sure. will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check if the aggregate contains inner references? You could use collect leaves here: a.collectLeaves.forall(_.isInstanceOf[OuterReference])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Sounds good Herman. I will change the guard to look for "all outer references" now. I will do a small followup to raise an appropriate exception if there are mixture of outer and inner references within a AggregateExpression as i have to write new tests for it. Hope thats ok with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah just address this in a follow-up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use flatMap here? Instead of foreach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOuterReferences is kind of magical in the sense that it also isolates the aggregate expression. Could you add a single line of comment to highlight this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make create two objects SubqueryExpression and SubExprUtils? Are they that different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Actually @nsyca and i had discussed a bit about this. SubqueryExpression object has methods that operates strictly in the context of SubqueryExpression where as the utils has methods that mostly deals with OuterReferences.. So they can operate on the subquery plans referred to from SubqueryExpression or may be in the future if we support queries of the form.
select * from t1 left outer join (select * from t2 where t2.c1 = t1.c1) on …
If you think we should merge this two then let me know and i will do it :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so the the SubExprUtils should be moved to the OuterReference companion :)...
Let's leave it for now.
|
@dilipbiswal can you update. I left a few minor comments, but it is good to go anyway (pending the update). |
|
@hvanhovell I am in the process of incorporating the comments. I will send an update soon. |
7178719 to
19cdbb0
Compare
|
Test build #74489 has started for PR 16954 at commit |
|
retest this please |
|
Test build #74495 has finished for PR 16954 at commit
|
|
LGTM. Merging to master. Thanks for the hard work, and your patience! |
|
@hvanhovell @gatorsmile @nsyca Thank you very much !! |
…icates contain aggregate expression that has mixture of outer and local references. ## What changes were proposed in this pull request? Address a follow up in [comment](apache#16954 (comment)) Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following : ```SQL SELECT t1a FROM t1 GROUP BY 1 HAVING EXISTS (SELECT 1 FROM t2 WHERE t2a < min(t1a + t2a)); ``` Exception snippet. ``` Cannot evaluate expression: min((input[0, int, false] + input[4, int, false])) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103) ``` After this PR, a better error message is issued. ``` org.apache.spark.sql.AnalysisException Error in query: Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; ``` ## How was this patch tested? Added tests in SQLQueryTestSuite. Author: Dilip Biswal <[email protected]> Closes apache#17636 from dilipbiswal/subquery_followup1.
What changes were proposed in this pull request?
Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its
originating SubqueryExpression. The subquery plan is then transformed to remove the correlated
predicates after they are moved up to the outer plan. In this PR, the task of pulling up
correlated predicates is deferred to Optimizer. This is the initial work that will allow us to
support the form of correlated subqueries that we don't support today. The design document
from @nsyca can be found in the following link :
DesignDoc
The brief description of code changes (hopefully to aid with code review) can be be found in the
following link:
CodeChanges
How was this patch tested?
The test case PRs were submitted earlier using.
16337 16759 16841 16915 16798 16712 16710 16760 16802