Skip to content

Conversation

@dilipbiswal
Copy link
Contributor

What changes were proposed in this pull request?

Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its
originating SubqueryExpression. The subquery plan is then transformed to remove the correlated
predicates after they are moved up to the outer plan. In this PR, the task of pulling up
correlated predicates is deferred to Optimizer. This is the initial work that will allow us to
support the form of correlated subqueries that we don't support today. The design document
from @nsyca can be found in the following link :
DesignDoc

The brief description of code changes (hopefully to aid with code review) can be be found in the
following link:
CodeChanges

How was this patch tested?

The test case PRs were submitted earlier using.
16337 16759 16841 16915 16798 16712 16710 16760 16802

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72992 has finished for PR 16954 at commit 33844bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class In(value: Expression, list: Seq[Expression]) extends Predicate
  • abstract class SubqueryExpression(
  • case class ListQuery(
  • case class Exists(

@dilipbiswal
Copy link
Contributor Author

cc @hvanhovell @gatorsmile @nsyca

@gatorsmile
Copy link
Member

cc @hvanhovell I did the internal review. It is ready for you to review it. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, the query is not a valid query as its missing group by. It used to pass before as we implicitly added r.c into aggregate grouping as part of pulling up correlated predicates in analyzer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is mostly copied from Analyzer. The only difference is since validation is done by analyzer , we only process Filter, Project and Aggregate here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan example is available in Code Changes in section 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should create a special InSubQuery expression. This looks like a lot of work for what we are doing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following definition is moved from PredicateSubquery.hasPredicateSubquery with the removing of the PredicateSubquery in the first case pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following definition is moved from PredicateSubquery.hasNullAwarePredicateWithinNot with a code change to reflect the new representation of IN subquery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following definition is moved from a nested definition of the same name in ResovleSubquery.pullOuterCorrelatedPredicates in Analyzer.scala.

Copy link
Contributor

@nsyca nsyca Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following three definitions of stripOuterReference(s) are taken from a segment of code embedded in ResolveSubquery.rewriteSubQuery (Analyzer.scala).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following definition is moved unchanged from a nested definition of the same name in ResolveSubquery.pullOutCorrelatedPredicates (Analyzer.scala)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the de-duplication of sub plan happens in optimizer when we actually pull up the correlated predicates. Thus the project case is simplified.

@dilipbiswal
Copy link
Contributor Author

@hvanhovell Hi Herman, was wondering if you had some time to look into this PR ? Please let me know your thoughts.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took an initial pass (not quite there yet). Most of it is in order. I left a few comments.

I will take another pass this week.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: spacing is of here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks.. will change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this into CheckAnalysis?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also use foreachUp instead of transformUp for the tree traversal in this method.

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell The code here validates the correlated references as well as collects them to rewrite the outer plan to record the outer references. You are suggesting to move the "check" portion of checkAndGetOuter... () to checkAnalysis ? I will change to use foreachUp. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then leave as it is. I thought it was only doing validation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be somewhat expensive. We could also use something that terminates early.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am all for decent documentation, but do you think we can find a somewhat smaller example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I will work on a smaller example. I think we don't need the subquery expression in the left plan. That should cut down the plan size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal You probably are already on the right track that one of the subquery expression is not required. I guess either T1 intersect T1 where exists (T2) or a self-join scenario like T1, T1 Tx where exists (T2 where <Q>.col = T2.col) with <Q> be T1 or Tx should also do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ArrayBuffer.empty[Seq[Expression]] should also work. Why is it useful to collect sequences of expressions instead of just expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thank you. I will change to use ArrayBuffer[Expression] instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works with aggregates that are already in the Aggregate operator, this seems like a regression. What does Hive do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell At the time of executing this rule, the aggregate expressions from the subquery plan are already pushed down to the Aggregate operator through ResolveAggregateFunctions. Here we are just updating the outer references in the subquery plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, awesome.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird. Is this also the current behavior?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Here is where i saw the code that handles the promotion between date and timestamp types. code

Please let me know if i missed something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we factor that code out then? Now we have the same logic in two places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks!!. I had tried to do this before as well as this came up during the internal review. I have made another try. Please let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok please do this in a follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O wait, you have changed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is all done to get a better error message right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Actually previously checkInputDataTypes() called from checkAnalysis never had to deal with In subquery expressions as it gets rewritten to PredicateSubquery. With the change in this PR we now see the original IN subquery expressions and thus we need to make sure the types are same between LHS and RHS of the IN subquery expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure this is wrong. This will work on the following expression: NOT(a) AND b IN(SELECT q FROM t)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Actually I copied this function from existing PredicateSubquery.hasNullAwarePredicateWithinNot. I also found it a little odd that we are doing a e.find .. but was afraid to change this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I actually had debugged this before and had completely forgotten about it. The case you mention actually works ok because of the way we invoke this functions. Please see the caller at - code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I have refactored the code a bit and i hope, this function is bit clearer to follow. I have moved the code from its caller in checkAnalysis to this function. Please let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use foreachDown?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Actually herman, in this place, we are actually doing some intermediary transformation (even though we are not returning the transformed expression). After striping the outer references from the AggregateExpression, we are returning the newExpr. Example: min(outer(b))
when we hit 'min', we strip out the outer references and return min(b) to the next transformation so that we don't encounter outer(b) in the next pass and consider it an outer expression. May be there is a better way to achieve this :-)

@dilipbiswal
Copy link
Contributor Author

dilipbiswal commented Mar 1, 2017

@hvanhovell Hello Herman, I have addressed all your comments except one where you are suggesting an optimization to failOnOuterReferenceInSubTree(). I am trying to see how i can improve that code. In the meantime, i wanted to push the updates i have so far. Please go through when you have some time. Thanks a lot.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73646 has finished for PR 16954 at commit 886a744.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73712 has finished for PR 16954 at commit 3b4bb90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s" -> "

and also move the space from the beginning of the second line to the end of the first line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left hand side -> left hand side (LHS)
right hand side -> right hand side (RHS)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lhs -> LHS. Please correct all the similar cases in comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment to explain what this condition means.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use case (name, value) => instead of pair

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case (p, In(e, Seq(ListQuery(sub, conditions, _))))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case (p, Not(In(e, Seq(ListQuery(sub, conditions, _))))) =>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case Exists(sub, conditions, _)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case In(value, Seq(ListQuery(sub, conditions, _)))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getValueExpression(e) -> getValueExpression(value)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newplan -> newPlan
newcond -> newCond

val (newPlan, newCond) = if (outer.nonEmpty) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ScalarSubquery(sub, children, exprId) if children.nonEmpty =>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want, you also can use cond to replace children

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case Exists(sub, children, exprId) if children.nonEmpty =>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ListQuery(sub, _, exprId) =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this in the guard of the rule. This currently throws a very hard to understand error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use orElse?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better if you cast on the castedLhs here:

val newLhs = castedLhs match {
  case Seq(lhs) => lhs
  case _ =>
    CreateNamedStruct(cns.nameExprs.zip(castedLhs).flatMap {
      case (name, value) => Seq(name, value)
    })
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Can you please double check the above code ? I am unable to compile the code as cns is not defined. In this block, i was looking at the original left hand side expression to see if its a named struct and if so i construct a named struct back. castedLhs would always match a Seq(lhs) , no ? Please let me know if i am missing something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I never compile these snippets. You have a point there. We could just use CreateStruct (since we really don't care about the name). So that would look something like this (again not compiled):

val newLhs = castedLhs match {
  case Seq(lhs) => lhs
  case _ => CreateStruct(castedLhs)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks a lot. You are right, we don't care about the names. This looks much better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to display the offending columns. Now the user has to find the offending columns by herself/himself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell The new error message looks like following. Does this look okay to you ?

Error in query: cannot resolve '(named_struct('c1', at1.`c1`, 'c2', at1.`c2`) IN (listquery()))' due to data type mismatch: 
The data type of one or more elements in the left hand side of an IN subquery
is not compatible with the data type of the output of the subquery
Mismatched columns:
[(at1.`c1`:decimal(10,0), at2.`c1`:timestamp), (at1.`c2`:timestamp, at2.`c2`:decimal(10,0))]
Left side:
[decimal(10,0), timestamp].
Right side:
[timestamp, decimal(10,0)].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use catalog strings please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looks fishy to me. It is also in the original code base, so we don't have to fix this now. Can you open a JIRA to track this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Sure. will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check if the aggregate contains inner references? You could use collect leaves here: a.collectLeaves.forall(_.isInstanceOf[OuterReference])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Sounds good Herman. I will change the guard to look for "all outer references" now. I will do a small followup to raise an appropriate exception if there are mixture of outer and inner references within a AggregateExpression as i have to write new tests for it. Hope thats ok with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just address this in a follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use flatMap here? Instead of foreach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOuterReferences is kind of magical in the sense that it also isolates the aggregate expression. Could you add a single line of comment to highlight this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make create two objects SubqueryExpression and SubExprUtils? Are they that different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Actually @nsyca and i had discussed a bit about this. SubqueryExpression object has methods that operates strictly in the context of SubqueryExpression where as the utils has methods that mostly deals with OuterReferences.. So they can operate on the subquery plans referred to from SubqueryExpression or may be in the future if we support queries of the form.

select * from t1 left outer join (select * from t2 where t2.c1 = t1.c1) on …

If you think we should merge this two then let me know and i will do it :-).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so the the SubExprUtils should be moved to the OuterReference companion :)...

Let's leave it for now.

@hvanhovell
Copy link
Contributor

@dilipbiswal can you update. I left a few minor comments, but it is good to go anyway (pending the update).

@dilipbiswal
Copy link
Contributor Author

@hvanhovell I am in the process of incorporating the comments. I will send an update soon.

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74489 has started for PR 16954 at commit 19cdbb0.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74495 has finished for PR 16954 at commit 19cdbb0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM. Merging to master. Thanks for the hard work, and your patience!

@asfgit asfgit closed this in 4ce970d Mar 14, 2017
@dilipbiswal
Copy link
Contributor Author

@hvanhovell @gatorsmile @nsyca Thank you very much !!

ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 20, 2017
…icates contain aggregate expression that has mixture of outer and local references.

## What changes were proposed in this pull request?
Address a follow up in [comment](apache#16954 (comment))
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :

```SQL
SELECT t1a
FROM   t1
GROUP  BY 1
HAVING EXISTS (SELECT 1
               FROM  t2
               WHERE t2a < min(t1a + t2a));
```
Exception snippet.
```
Cannot evaluate expression: min((input[0, int, false] + input[4, int, false]))
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226)
	at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103)

```
After this PR, a better error message is issued.
```
org.apache.spark.sql.AnalysisException
Error in query: Found an aggregate expression in a correlated
predicate that has both outer and local references, which is not supported yet.
Aggregate expression: min((t1.`t1a` + t2.`t2a`)),
Outer references: t1.`t1a`,
Local references: t2.`t2a`.;
```
## How was this patch tested?
Added tests in SQLQueryTestSuite.

Author: Dilip Biswal <[email protected]>

Closes apache#17636 from dilipbiswal/subquery_followup1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants