Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1210,16 +1210,41 @@ class Analyzer(
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]

// Validate that correlated aggregate expression do not contain a mixture
// of outer and local references.
def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
val outer = a.collect { case OuterReference(e) => e.toAttribute }
val local = a.references -- outer
if (local.nonEmpty) {
val msg =
s"""
|Found an aggregate expression in a correlated predicate that has both
|outer and local references, which is not supported yet.
|Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
|Outer references: ${outer.map(_.sql).mkString(", ")},
|Local references: ${local.map(_.sql).mkString(", ")}.
""".stripMargin.replace("\n", " ").trim()
failAnalysis(msg)
}
case _ =>
}
}

// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
}
}

// Make sure a plan's expressions do not contain outer references
def failOnOuterReference(p: LogicalPlan): Unit = {
if (p.expressions.exists(containsOuter)) {
// Make sure a plan's expressions do not contain :
// 1. Aggregate expressions that have mixture of outer and local references.
// 2. Expressions containing outer references on plan nodes other than Filter.
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
Expand Down Expand Up @@ -1289,9 +1314,9 @@ class Analyzer(
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case s: Sort =>
failOnOuterReference(s)
failOnInvalidOuterReference(s)
case r: RepartitionByExpression =>
failOnOuterReference(r)
failOnInvalidOuterReference(r)

// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
Expand All @@ -1305,6 +1330,8 @@ class Analyzer(
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}

failOnInvalidOuterReference(f)
// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
Expand All @@ -1314,23 +1341,23 @@ class Analyzer(
// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p: Project =>
failOnOuterReference(p)
failOnInvalidOuterReference(p)

// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
// only equality correlated predicates.
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
failOnOuterReference(a)
failOnInvalidOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)

// Join can host correlated expressions.
case j @ Join(left, right, joinType, _) =>
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)

// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
Expand All @@ -1341,12 +1368,12 @@ class Analyzer(
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(right)

// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(left)

// Any other join types not explicitly listed above,
Expand All @@ -1362,7 +1389,7 @@ class Analyzer(
// Note:
// Generator with join=false is treated as Category 4.
case g: Generate if g.join =>
failOnOuterReference(g)
failOnInvalidOuterReference(g)

// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any test case to cover this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I couldn't find a test for this. I will add one for now in SubquerySuite. I will move the negative tests to the sqlquerytestsuite in a follow-up pr.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,72 @@
-- The test file contains negative test cases
-- of invalid queries where error messages are expected.

create temporary view t1 as select * from values
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those just change for case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Yeah.. since i was on this test case, thought i should fix the case.

CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c);
AS t1(t1a, t1b, t1c);

create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c);
AS t2(t2a, t2b, t2c);

create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c);
AS t3(t3a, t3b, t3c);

-- TC 01.01
-- The column t2b in the SELECT of the subquery is invalid
-- because it is neither an aggregate function nor a GROUP BY column.
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
;

-- TC 01.02
-- Invalid due to the column t2b not part of the output from table t2.
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a IN (SELECT min(t2a)
FROM t2
GROUP BY t2c
HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
;

-- TC 01.03
-- Invalid due to mixure of outer and local references under an AggegatedExpression
-- in a correlated predicate
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a));

-- TC 01.04
-- Invalid due to mixure of outer and local references under an AggegatedExpression
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1));

-- TC 01.05
-- Invalid due to outer reference appearing in projection list
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3));

Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 5
-- Number of queries: 8


-- !query 0
create temporary view t1 as select * from values
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c)
AS t1(t1a, t1b, t1c)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c)
AS t2(t2a, t2b, t2c)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c)
AS t3(t3a, t3b, t3c)
-- !query 2 schema
struct<>
-- !query 2 output



-- !query 3
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
-- !query 3 schema
Expand All @@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct


-- !query 4
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a IN (SELECT min(t2a)
FROM t2
GROUP BY t2c
HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);


-- !query 5
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a))
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.;


-- !query 6
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1))
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)), Outer references: t2.`t2a`, Local references: t3.`t3a`.;


-- !query 7
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3))
-- !query 7 schema
struct<>
-- !query 7 output
org.apache.spark.sql.AnalysisException
Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
Aggregate [min(outer(t2a#x)) AS min(outer())#x]
+- SubqueryAlias t3
+- Project [t3a#x, t3b#x, t3c#x]
+- SubqueryAlias t3
+- LocalRelation [t3a#x, t3b#x, t3c#x]
;
23 changes: 18 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -822,12 +822,25 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql(
"""
| select c2
| from t1
| where exists (select *
| from t2 lateral view explode(arr_c2) q as c2
where t1.c1 = t2.c1)""".stripMargin),
| SELECT c2
| FROM t1
| WHERE EXISTS (SELECT *
| FROM t2 LATERAL VIEW explode(arr_c2) q AS c2
WHERE t1.c1 = t2.c1)""".stripMargin),
Row(1) :: Row(0) :: Nil)

val msg1 = intercept[AnalysisException] {
sql(
"""
| SELECT c1
| FROM t2
| WHERE EXISTS (SELECT *
| FROM t1 LATERAL VIEW explode(t2.arr_c2) q AS c2
| WHERE t1.c1 = t2.c1)
""".stripMargin)
}
assert(msg1.getMessage.contains(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING"))
}
}

Expand Down