From 9ad1833cc81b2e4781e5b7acfc0b2c784f555c15 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 17 Mar 2021 11:15:39 +0800 Subject: [PATCH 1/7] Push down the foldable predicate to both sides of Join --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 +++-- .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9a12ca1004fa7..983d1255a95da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1457,8 +1457,9 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic) val (leftEvaluateCondition, rest) = pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) - val (rightEvaluateCondition, commonCondition) = - rest.partition(expr => expr.references.subsetOf(right.outputSet)) + val (rightEvaluateCondition, _) = + pushDownCandidates.partition(_.references.subsetOf(right.outputSet)) + val commonCondition = rest.filterNot(e => rightEvaluateCondition.exists(_.semanticEquals(e))) (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index c518fdded2112..17c6e2742bb8a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1384,4 +1384,14 @@ class FilterPushdownSuite extends PlanTest { condition = Some("x.a".attr === "z.a".attr)).analyze comparePlans(optimized, correctAnswer) } + + test("SPARK-28220: Push down the foldable predicate to both sides of Join") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, condition = Some(false)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = x.where(false).join(y.where(false), condition = None).analyze + comparePlans(optimized, correctAnswer.analyze) + } } From 7997eee3ff7f48b3d5d897338fbd43fecdbb20a6 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 17 Mar 2021 20:34:52 +0800 Subject: [PATCH 2/7] fix --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 983d1255a95da..aec1714599358 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1457,9 +1457,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic) val (leftEvaluateCondition, rest) = pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) - val (rightEvaluateCondition, _) = - pushDownCandidates.partition(_.references.subsetOf(right.outputSet)) - val commonCondition = rest.filterNot(e => rightEvaluateCondition.exists(_.semanticEquals(e))) + val rightEvaluateCondition = pushDownCandidates.filter(_.references.subsetOf(right.outputSet)) + val commonCondition = rest.filterNot(_.references.subsetOf(right.outputSet)) (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } From 5988d2846450f8ec72fefcd2067f12819463cc4b Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 18 Mar 2021 23:20:10 +0800 Subject: [PATCH 3/7] Fix --- .../sql/catalyst/optimizer/PropagateEmptyRelation.scala | 9 ++++++--- .../catalyst/optimizer/PropagateEmptyRelationSuite.scala | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 15d4561b47a23..c3de84dbcb59a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._ * - Join with one or two empty children (including Intersect/Except). * 2. Unary-node Logical Plans * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. + * - Join with false condition. * - Aggregate with all empty children and at least one grouping expression. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ @@ -71,10 +73,11 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than // just processing the input data. - case p @ Join(_, _, joinType, _, _) + case p @ Join(_, _, joinType, conditionOpt, _) if !p.children.exists(_.isStreaming) => - val isLeftEmpty = isEmptyLocalRelation(p.left) - val isRightEmpty = isEmptyLocalRelation(p.right) + val isFalseCondition = conditionOpt.exists(_.semanticEquals(FalseLiteral)) + val isLeftEmpty = isEmptyLocalRelation(p.left) || isFalseCondition + val isRightEmpty = isEmptyLocalRelation(p.right) || isFalseCondition if (isLeftEmpty || isRightEmpty) { joinType match { case _: InnerLike => empty(p) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 5c980abdd8f53..3854a0929f0e4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -155,6 +155,12 @@ class PropagateEmptyRelationSuite extends PlanTest { } } + test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") { + val query = testRelation1.join(testRelation2, Inner, condition = Some(Literal.FalseLiteral)) + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, LocalRelation('a.int, 'b.int).analyze) + } + test("propagate empty relation through UnaryNode") { val query = testRelation1 .where(false) From ee5b4eb2f08548ab3de6c33b9792e5f6439c5c45 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 18 Mar 2021 23:21:42 +0800 Subject: [PATCH 4/7] Fix --- .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 17c6e2742bb8a..f35a1e2a5905d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1388,10 +1388,10 @@ class FilterPushdownSuite extends PlanTest { test("SPARK-28220: Push down the foldable predicate to both sides of Join") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - val originalQuery = x.join(y, condition = Some(false)) + val originalQuery = x.join(y, condition = Some(true)) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = x.where(false).join(y.where(false), condition = None).analyze + val correctAnswer = x.where(true).join(y.where(true), condition = None).analyze comparePlans(optimized, correctAnswer.analyze) } } From 9e913e9c2789d152c7b47eeada472452ba8f2101 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 19 Mar 2021 00:14:23 +0800 Subject: [PATCH 5/7] pushDownJoinConditions --- .../sql/catalyst/optimizer/Optimizer.scala | 22 ++++++++++++------- .../optimizer/FilterPushdownSuite.scala | 17 +++++++++++--- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aec1714599358..c2217aa074b84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -1468,6 +1469,15 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { case _ => false } + private def pushDownJoinConditions(conditions: Seq[Expression], plan: LogicalPlan) = { + // Push down true condition is useless. + if (conditions.exists(_.semanticEquals(TrueLiteral))) { + plan + } else { + conditions.reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan) + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { @@ -1526,17 +1536,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { joinType match { case _: InnerLike | LeftSemi => // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newLeft = pushDownJoinConditions(leftJoinConditions, left) + val newRight = pushDownJoinConditions(rightJoinConditions, right) val newJoinCond = commonJoinCondition.reduceLeftOption(And) Join(newLeft, newRight, joinType, newJoinCond, hint) case RightOuter => // push down the left side only join filter for left side sub query - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newLeft = pushDownJoinConditions(leftJoinConditions, left) val newRight = right val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) @@ -1544,8 +1551,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { case LeftOuter | LeftAnti | ExistenceJoin(_) => // push down the right side only join filter for right sub query val newLeft = left - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newRight = pushDownJoinConditions(rightJoinConditions, right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, joinType, newJoinCond, hint) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index f35a1e2a5905d..c7b69b51a5152 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1385,13 +1385,24 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("SPARK-28220: Push down the foldable predicate to both sides of Join") { + test("SPARK-28220: Push down true join condition for inner join") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val originalQuery = x.join(y, condition = Some(true)) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = x.where(true).join(y.where(true), condition = None).analyze - comparePlans(optimized, correctAnswer.analyze) + val correctAnswer = x.join(y, condition = None).analyze + comparePlans(optimized, correctAnswer) + } + + test("SPARK-28220: Should not push down true join condition for left/right join") { + Seq(LeftOuter, RightOuter).foreach { joinType => + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, joinType = joinType, condition = Some(true)) + + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } } } From 25689da8aef89501e86d9891f54baa89644199bc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 19 Mar 2021 16:39:25 +0800 Subject: [PATCH 6/7] fix --- .../sql/catalyst/optimizer/Optimizer.scala | 9 +++---- .../optimizer/PropagateEmptyRelation.scala | 19 +++++++++----- .../PropagateEmptyRelationSuite.scala | 25 ++++++++++++++++--- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c2217aa074b84..34e618a23ec9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1470,12 +1470,9 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } private def pushDownJoinConditions(conditions: Seq[Expression], plan: LogicalPlan) = { - // Push down true condition is useless. - if (conditions.exists(_.semanticEquals(TrueLiteral))) { - plan - } else { - conditions.reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan) - } + conditions + .filterNot(_.semanticEquals(TrueLiteral)) // Push down true condition is useless. + .reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan) } def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index c3de84dbcb59a..05b61638cf72c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -75,23 +75,30 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit // just processing the input data. case p @ Join(_, _, joinType, conditionOpt, _) if !p.children.exists(_.isStreaming) => - val isFalseCondition = conditionOpt.exists(_.semanticEquals(FalseLiteral)) - val isLeftEmpty = isEmptyLocalRelation(p.left) || isFalseCondition - val isRightEmpty = isEmptyLocalRelation(p.right) || isFalseCondition - if (isLeftEmpty || isRightEmpty) { + val isLeftEmpty = isEmptyLocalRelation(p.left) + val isRightEmpty = isEmptyLocalRelation(p.right) + val isFalseCondition = conditionOpt match { + case Some(FalseLiteral) => true + case _ => false + } + if (isLeftEmpty || isRightEmpty || isFalseCondition) { joinType match { case _: InnerLike => empty(p) // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p) - case LeftSemi if isRightEmpty => empty(p) - case LeftAnti if isRightEmpty => p.left + case LeftSemi if isRightEmpty | isFalseCondition => empty(p) + case LeftAnti if isRightEmpty | isFalseCondition => p.left case FullOuter if isLeftEmpty && isRightEmpty => empty(p) case LeftOuter | FullOuter if isRightEmpty => Project(p.left.output ++ nullValueProjectList(p.right), p.left) case RightOuter if isRightEmpty => empty(p) case RightOuter | FullOuter if isLeftEmpty => Project(nullValueProjectList(p.left) ++ p.right.output, p.right) + case LeftOuter if isFalseCondition => + Project(p.left.output ++ nullValueProjectList(p.right), p.left) + case RightOuter if isFalseCondition => + Project(nullValueProjectList(p.left) ++ p.right.output, p.right) case _ => p } } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 3854a0929f0e4..b5dcb8aa67646 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -147,7 +148,7 @@ class PropagateEmptyRelationSuite extends PlanTest { testcases.foreach { case (left, right, jt, answer) => val query = testRelation1 .where(left) - .join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr)) + .join(testRelation2.where(right), joinType = jt, condition = Some('a.attr === 'b.attr)) val optimized = Optimize.execute(query.analyze) val correctAnswer = answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) @@ -156,9 +157,25 @@ class PropagateEmptyRelationSuite extends PlanTest { } test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") { - val query = testRelation1.join(testRelation2, Inner, condition = Some(Literal.FalseLiteral)) - val optimized = Optimize.execute(query.analyze) - comparePlans(optimized, LocalRelation('a.int, 'b.int).analyze) + val testcases = Seq( + (Inner, Some(LocalRelation('a.int, 'b.int))), + (Cross, Some(LocalRelation('a.int, 'b.int))), + (LeftOuter, + Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)), + (RightOuter, + Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)), + (FullOuter, None), + (LeftAnti, Some(testRelation1)), + (LeftSemi, Some(LocalRelation('a.int))) + ) + + testcases.foreach { case (jt, answer) => + val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral)) + val optimized = Optimize.execute(query.analyze) + val correctAnswer = + answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + comparePlans(optimized, correctAnswer) + } } test("propagate empty relation through UnaryNode") { From debb094e7cafe6b35fdf3e1687950befe2f6055f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 19 Mar 2021 22:26:16 +0800 Subject: [PATCH 7/7] fix --- .../sql/catalyst/optimizer/Optimizer.scala | 23 ++++++++----------- .../optimizer/FilterPushdownSuite.scala | 21 ----------------- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 34e618a23ec9e..9a12ca1004fa7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -1458,8 +1457,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic) val (leftEvaluateCondition, rest) = pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) - val rightEvaluateCondition = pushDownCandidates.filter(_.references.subsetOf(right.outputSet)) - val commonCondition = rest.filterNot(_.references.subsetOf(right.outputSet)) + val (rightEvaluateCondition, commonCondition) = + rest.partition(expr => expr.references.subsetOf(right.outputSet)) (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } @@ -1469,12 +1468,6 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { case _ => false } - private def pushDownJoinConditions(conditions: Seq[Expression], plan: LogicalPlan) = { - conditions - .filterNot(_.semanticEquals(TrueLiteral)) // Push down true condition is useless. - .reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan) - } - def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { @@ -1533,14 +1526,17 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { joinType match { case _: InnerLike | LeftSemi => // push down the single side only join filter for both sides sub queries - val newLeft = pushDownJoinConditions(leftJoinConditions, left) - val newRight = pushDownJoinConditions(rightJoinConditions, right) + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = commonJoinCondition.reduceLeftOption(And) Join(newLeft, newRight, joinType, newJoinCond, hint) case RightOuter => // push down the left side only join filter for left side sub query - val newLeft = pushDownJoinConditions(leftJoinConditions, left) + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = right val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) @@ -1548,7 +1544,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { case LeftOuter | LeftAnti | ExistenceJoin(_) => // push down the right side only join filter for right sub query val newLeft = left - val newRight = pushDownJoinConditions(rightJoinConditions, right) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, joinType, newJoinCond, hint) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index c7b69b51a5152..c518fdded2112 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1384,25 +1384,4 @@ class FilterPushdownSuite extends PlanTest { condition = Some("x.a".attr === "z.a".attr)).analyze comparePlans(optimized, correctAnswer) } - - test("SPARK-28220: Push down true join condition for inner join") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, condition = Some(true)) - - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = x.join(y, condition = None).analyze - comparePlans(optimized, correctAnswer) - } - - test("SPARK-28220: Should not push down true join condition for left/right join") { - Seq(LeftOuter, RightOuter).foreach { joinType => - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, joinType = joinType, condition = Some(true)) - - val optimized = Optimize.execute(originalQuery.analyze) - comparePlans(optimized, originalQuery.analyze) - } - } }