From 145faf9fe81c4285778e6f817553cd55dc068875 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 2 May 2017 08:36:18 -0700 Subject: [PATCH 1/6] Initial version of constant propagation --- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../sql/catalyst/optimizer/expressions.scala | 53 +++++++++ .../optimizer/ConstantPropagationSuite.scala | 102 ++++++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ae2f6bfa94ae7..d16689a34298a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -92,6 +92,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineUnions, // Constant folding and strength reduction NullPropagation(conf), + ConstantPropagation, FoldablePropagation, OptimizeIn(conf), ConstantFolding, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 8931eb2c8f3b1..f1a7d9b696059 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -54,6 +54,59 @@ object ConstantFolding extends Rule[LogicalPlan] { } } +/** + * Substitutes [[Attribute Attributes]] which can be statically evaluated with their corresponding + * value in conjunctive [[Expression Expressions]] + * eg. + * {{{ + * SELECT * FROM table WHERE i = 5 AND j = i + 3 + * ==> SELECT * FROM table WHERE i = 5 AND j = 8 + * }}} + */ +object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { + + def containsNonConjunctionPredicates(expression: Expression): Boolean = expression match { + case Not(_) => true + case Or(_, _) => true + case _ => + var result = false + expression.children.foreach { + case Not(_) => result = true + case Or(_, _) => result = true + case other => result = result || containsNonConjunctionPredicates(other) + } + result + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsUp { + case and @ (left And right) + if !containsNonConjunctionPredicates(left) && !containsNonConjunctionPredicates(right) => + + val leftEntries = left.collect { + case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) + case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) + } + val rightEntries = right.collect { + case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) + case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) + } + val constantsMap = AttributeMap(leftEntries.map(_._1) ++ rightEntries.map(_._1)) + val predicates = (leftEntries.map(_._2) ++ rightEntries.map(_._2)).toSet + + def replaceConstants(expression: Expression) = expression transform { + case a: AttributeReference if constantsMap.contains(a) => + constantsMap.get(a).getOrElse(a) + } + + and transform { + case e @ EqualTo(_, _) if !predicates.contains(e) && + e.references.exists(ref => constantsMap.contains(ref)) => + replaceConstants(e) + } + } + } +} /** * Reorder associative integral-type operators and fold all constants into one. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala new file mode 100644 index 0000000000000..36b4c19455e7c --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class ConstantPropagationSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("AnalysisNodes", Once, + EliminateSubqueryAliases) :: + Batch("ConstantPropagation", Once, + ColumnPruning, + ConstantPropagation, + ConstantFolding, + BooleanSimplification) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + private val columnA = 'a.int + private val columnB = 'b.int + + /** + * Unit tests for constant propagation in expressions. + */ + test("basic test") { + val query = testRelation + .select(columnA) + .where(columnA === Add(columnB, Literal(1)) && columnB === Literal(10)) + + val correctAnswer = + testRelation + .select(columnA) + .where(columnA === Literal(11) && columnB === Literal(10)).analyze + + comparePlans(Optimize.execute(query.analyze), correctAnswer) + } + + test("with combination of AND and OR predicates") { + val query = testRelation + .select(columnA) + .where( + columnA === Add(columnB, Literal(1)) && + columnB === Literal(10) && + (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + .analyze + + val correctAnswer = + testRelation + .select(columnA) + .where( + columnA === Literal(11) && + columnB === Literal(10) && + (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + .analyze + + comparePlans(Optimize.execute(query), correctAnswer) + } + + test("negative test : with NOT in predicate") { + val query = testRelation + .select(columnA) + .where(Not(columnA === Add(columnB, Literal(1))) && columnB === Literal(10)) + .analyze + + comparePlans(Optimize.execute(query), query) + } + + test("negative test : with OR in predicate") { + val query = testRelation + .select(columnA) + .where( + columnA === Add(columnB, Literal(2)) && + (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + .analyze + + comparePlans(Optimize.execute(query), query) + } +} From ef13e87a9b65915710b168b11f90c0e8fc788692 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 18 May 2017 07:48:28 -0700 Subject: [PATCH 2/6] review comment --- .../sql/catalyst/optimizer/expressions.scala | 61 +++++++++-------- .../optimizer/ConstantPropagationSuite.scala | 68 ++++++++++++++++--- 2 files changed, 92 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index f1a7d9b696059..c7c9ab6606a7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -62,47 +62,50 @@ object ConstantFolding extends Rule[LogicalPlan] { * SELECT * FROM table WHERE i = 5 AND j = i + 3 * ==> SELECT * FROM table WHERE i = 5 AND j = 8 * }}} + * + * Approach used: + * - Start from AND operator as the root + * - Get all the children conjunctive predicates which are EqualTo / EqualNullSafe such that they + * don't have a `NOT` or `OR` operator in them + * - Populate a mapping of attribute => constant value by looking at all the equals predicates + * - Using this mapping, replace occurrence of the attributes with the corresponding constant values + * in the AND node. */ object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { - - def containsNonConjunctionPredicates(expression: Expression): Boolean = expression match { - case Not(_) => true - case Or(_, _) => true - case _ => - var result = false - expression.children.foreach { - case Not(_) => result = true - case Or(_, _) => result = true - case other => result = result || containsNonConjunctionPredicates(other) - } - result - } + def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { + case _: Not | _: Or => true + case _ => false + }.isDefined def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case q: LogicalPlan => q transformExpressionsUp { - case and @ (left And right) - if !containsNonConjunctionPredicates(left) && !containsNonConjunctionPredicates(right) => - - val leftEntries = left.collect { + case f: Filter => f transformExpressionsUp { + case and: And => + val conjunctivePredicates = + splitConjunctivePredicates(and) + .filter(expr => expr.isInstanceOf[EqualTo] || expr.isInstanceOf[EqualNullSafe]) + .filterNot(expr => containsNonConjunctionPredicates(expr)) + + val equalityPredicates = conjunctivePredicates.collect { case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) + case e @ EqualNullSafe(left: AttributeReference, right: Literal) => ((left, right), e) + case e @ EqualNullSafe(left: Literal, right: AttributeReference) => ((right, left), e) } - val rightEntries = right.collect { - case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) - case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) - } - val constantsMap = AttributeMap(leftEntries.map(_._1) ++ rightEntries.map(_._1)) - val predicates = (leftEntries.map(_._2) ++ rightEntries.map(_._2)).toSet + + val constantsMap = AttributeMap(equalityPredicates.map(_._1)) + val predicates = equalityPredicates.map(_._2).toSet def replaceConstants(expression: Expression) = expression transform { - case a: AttributeReference if constantsMap.contains(a) => - constantsMap.get(a).getOrElse(a) + case a: AttributeReference => + constantsMap.get(a) match { + case Some(literal) => literal + case None => a + } } and transform { - case e @ EqualTo(_, _) if !predicates.contains(e) && - e.references.exists(ref => constantsMap.contains(ref)) => - replaceConstants(e) + case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants(e) + case e @ EqualNullSafe(_, _) if !predicates.contains(e) => replaceConstants(e) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala index 36b4c19455e7c..f09b83b5e7c65 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala @@ -25,13 +25,16 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor +/** + * Unit tests for constant propagation in expressions. + */ class ConstantPropagationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: - Batch("ConstantPropagation", Once, + Batch("ConstantPropagation", FixedPoint(2), ColumnPruning, ConstantPropagation, ConstantFolding, @@ -42,10 +45,8 @@ class ConstantPropagationSuite extends PlanTest { private val columnA = 'a.int private val columnB = 'b.int + private val columnC = 'c.int - /** - * Unit tests for constant propagation in expressions. - */ test("basic test") { val query = testRelation .select(columnA) @@ -65,7 +66,7 @@ class ConstantPropagationSuite extends PlanTest { .where( columnA === Add(columnB, Literal(1)) && columnB === Literal(10) && - (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + (columnA === Add(columnC, Literal(3)) || columnB === columnC)) .analyze val correctAnswer = @@ -74,22 +75,55 @@ class ConstantPropagationSuite extends PlanTest { .where( columnA === Literal(11) && columnB === Literal(10) && - (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + (Literal(11) === Add(columnC, Literal(3)) || Literal(10) === columnC)) .analyze comparePlans(Optimize.execute(query), correctAnswer) } - test("negative test : with NOT in predicate") { + test("equality predicates outside a `NOT` can be propagated within a `NOT`") { val query = testRelation .select(columnA) .where(Not(columnA === Add(columnB, Literal(1))) && columnB === Literal(10)) .analyze + val correctAnswer = + testRelation + .select(columnA) + .where(Not(columnA === Literal(11)) && columnB === Literal(10)) + .analyze + + comparePlans(Optimize.execute(query), correctAnswer) + } + + test("equality predicates inside a `NOT` should not be picked for propagation") { + val query = testRelation + .select(columnA) + .where(Not(columnB === Literal(10)) && columnA === Add(columnB, Literal(1))) + .analyze + comparePlans(Optimize.execute(query), query) } - test("negative test : with OR in predicate") { + test("equality predicates outside a `OR` can be propagated within a `OR`") { + val query = testRelation + .select(columnA) + .where( + columnA === Literal(2) && + (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) + .analyze + + val correctAnswer = testRelation + .select(columnA) + .where( + columnA === Literal(2) && + (Literal(2) === Add(columnB, Literal(3)) || columnB === Literal(9))) + .analyze + + comparePlans(Optimize.execute(query), correctAnswer) + } + + test("equality predicates inside a `OR` should not be picked for propagation") { val query = testRelation .select(columnA) .where( @@ -99,4 +133,22 @@ class ConstantPropagationSuite extends PlanTest { comparePlans(Optimize.execute(query), query) } + + test("equality operator not immediate child of root `AND` should not be used for propagation") { + val query = testRelation + .select(columnA) + .where( + columnA === Literal(0) && + ((columnB === columnA) === (columnB === Literal(0)))) + .analyze + + val correctAnswer = testRelation + .select(columnA) + .where( + columnA === Literal(0) && + ((columnB === Literal(0)) === (columnB === Literal(0)))) + .analyze + + comparePlans(Optimize.execute(query), correctAnswer) + } } From 32040ab729151c2e4b4c367dcadcfb947638ee6f Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sun, 21 May 2017 15:18:30 -0700 Subject: [PATCH 3/6] Fix unit tests - FileSourceStrategySuite.partitioned table - FileSourceStrategySuite.partitioned table - case insensitive - FileSourceStrategySuite.partitioned table - after scan filters --- .../datasources/FileSourceStrategySuite.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index fa3c69612704d..9a2dcafb5e4b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -190,7 +190,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi checkDataFilters(Set.empty) // Only one file should be read. - checkScan(table.where("p1 = 1 AND c1 = 1 AND (p1 + c1) = 1")) { partitions => + checkScan(table.where("p1 = 1 AND c1 = 1 AND (p1 + c1) = 2")) { partitions => assert(partitions.size == 1, "when checking partitions") assert(partitions.head.files.size == 1, "when checking files in partition 1") assert(partitions.head.files.head.partitionValues.getInt(0) == 1, @@ -217,7 +217,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi checkDataFilters(Set.empty) // Only one file should be read. - checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions => + checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 2")) { partitions => assert(partitions.size == 1, "when checking partitions") assert(partitions.head.files.size == 1, "when checking files in partition 1") assert(partitions.head.files.head.partitionValues.getInt(0) == 1, @@ -235,13 +235,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi "p1=1/file1" -> 10, "p1=2/file2" -> 10)) - val df = table.where("p1 = 1 AND (p1 + c1) = 2 AND c1 = 1") + val df1 = table.where("p1 = 1 AND (p1 + c1) = 2 AND c1 = 1") // Filter on data only are advisory so we have to reevaluate. - assert(getPhysicalFilters(df) contains resolve(df, "c1 = 1")) - // Need to evalaute filters that are not pushed down. - assert(getPhysicalFilters(df) contains resolve(df, "(p1 + c1) = 2")) + assert(getPhysicalFilters(df1) contains resolve(df1, "c1 = 1")) // Don't reevaluate partition only filters. - assert(!(getPhysicalFilters(df) contains resolve(df, "p1 = 1"))) + assert(!(getPhysicalFilters(df1) contains resolve(df1, "p1 = 1"))) + + val df2 = table.where("(p1 + c2) = 2 AND c1 = 1") + // Filter on data only are advisory so we have to reevaluate. + assert(getPhysicalFilters(df2) contains resolve(df2, "c1 = 1")) + // Need to evalaute filters that are not pushed down. + assert(getPhysicalFilters(df2) contains resolve(df2, "(p1 + c2) = 2")) } test("bucketed table") { From 83f63bbcd64ee93c13c25c80bac8de8cc603b366 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 25 May 2017 16:26:17 -0700 Subject: [PATCH 4/6] unit test --- .../sql/catalyst/optimizer/ConstantPropagationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala index f09b83b5e7c65..7d50b7b76453a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala @@ -34,7 +34,7 @@ class ConstantPropagationSuite extends PlanTest { val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: - Batch("ConstantPropagation", FixedPoint(2), + Batch("ConstantPropagation", Once, ColumnPruning, ConstantPropagation, ConstantFolding, @@ -75,7 +75,7 @@ class ConstantPropagationSuite extends PlanTest { .where( columnA === Literal(11) && columnB === Literal(10) && - (Literal(11) === Add(columnC, Literal(3)) || Literal(10) === columnC)) + (columnA === Add(columnC, Literal(3)) || Literal(10) === columnC)) .analyze comparePlans(Optimize.execute(query), correctAnswer) From 731f79626b38321845ccca8813174a9cfa2331dd Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 26 May 2017 11:35:45 -0700 Subject: [PATCH 5/6] more test case --- .../spark/sql/catalyst/optimizer/expressions.scala | 2 +- .../optimizer/ConstantPropagationSuite.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index c7c9ab6606a7b..51f749a8bf857 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -72,7 +72,7 @@ object ConstantFolding extends Rule[LogicalPlan] { * in the AND node. */ object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { - def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { + private def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { case _: Not | _: Or => true case _ => false }.isDefined diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala index 7d50b7b76453a..9284e78d955fa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala @@ -151,4 +151,17 @@ class ConstantPropagationSuite extends PlanTest { comparePlans(Optimize.execute(query), correctAnswer) } + + test("conflicting equality predicates") { + val query = testRelation + .select(columnA) + .where( + columnA === Literal(1) && columnA === Literal(2) && columnB === Add(columnA, Literal(3))) + + val correctAnswer = testRelation + .select(columnA) + .where(columnA === Literal(1) && columnA === Literal(2) && columnB === Literal(5)) + + comparePlans(Optimize.execute(query.analyze), correctAnswer) + } } From 38543de8b45dc4145182d20e84a5796579548f3f Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sun, 28 May 2017 17:58:14 -0700 Subject: [PATCH 6/6] test case: change num iterations --- .../sql/catalyst/optimizer/ConstantPropagationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala index 9284e78d955fa..81d2f3667e2d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantPropagationSuite.scala @@ -34,7 +34,7 @@ class ConstantPropagationSuite extends PlanTest { val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: - Batch("ConstantPropagation", Once, + Batch("ConstantPropagation", FixedPoint(10), ColumnPruning, ConstantPropagation, ConstantFolding, @@ -75,7 +75,7 @@ class ConstantPropagationSuite extends PlanTest { .where( columnA === Literal(11) && columnB === Literal(10) && - (columnA === Add(columnC, Literal(3)) || Literal(10) === columnC)) + (Literal(11) === Add(columnC, Literal(3)) || Literal(10) === columnC)) .analyze comparePlans(Optimize.execute(query), correctAnswer)