-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20758][SQL] Add Constant propagation optimization #17993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
145faf9
ef13e87
32040ab
83f63bb
731f796
38543de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,62 @@ object ConstantFolding extends Rule[LogicalPlan] { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Substitutes [[Attribute Attributes]] which can be statically evaluated with their corresponding | ||
| * value in conjunctive [[Expression Expressions]] | ||
| * eg. | ||
| * {{{ | ||
| * SELECT * FROM table WHERE i = 5 AND j = i + 3 | ||
| * ==> SELECT * FROM table WHERE i = 5 AND j = 8 | ||
| * }}} | ||
| * | ||
| * Approach used: | ||
| * - Start from AND operator as the root | ||
| * - Get all the children conjunctive predicates which are EqualTo / EqualNullSafe such that they | ||
| * don't have a `NOT` or `OR` operator in them | ||
| * - Populate a mapping of attribute => constant value by looking at all the equals predicates | ||
| * - Using this mapping, replace occurrence of the attributes with the corresponding constant values | ||
| * in the AND node. | ||
| */ | ||
| object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { | ||
| private def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { | ||
| case _: Not | _: Or => true | ||
| case _ => false | ||
| }.isDefined | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case f: Filter => f transformExpressionsUp { | ||
| case and: And => | ||
| val conjunctivePredicates = | ||
| splitConjunctivePredicates(and) | ||
| .filter(expr => expr.isInstanceOf[EqualTo] || expr.isInstanceOf[EqualNullSafe]) | ||
| .filterNot(expr => containsNonConjunctionPredicates(expr)) | ||
|
|
||
| val equalityPredicates = conjunctivePredicates.collect { | ||
| case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) | ||
| case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) | ||
| case e @ EqualNullSafe(left: AttributeReference, right: Literal) => ((left, right), e) | ||
| case e @ EqualNullSafe(left: Literal, right: AttributeReference) => ((right, left), e) | ||
| } | ||
|
|
||
| val constantsMap = AttributeMap(equalityPredicates.map(_._1)) | ||
| val predicates = equalityPredicates.map(_._2).toSet | ||
|
||
|
|
||
| def replaceConstants(expression: Expression) = expression transform { | ||
| case a: AttributeReference => | ||
| constantsMap.get(a) match { | ||
| case Some(literal) => literal | ||
| case None => a | ||
| } | ||
| } | ||
|
|
||
| and transform { | ||
| case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants(e) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check for identity instead of equality? I think you are doing the latter. What will happen in the following example: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the behavior with this PR. Seems reasonable because |
||
| case e @ EqualNullSafe(_, _) if !predicates.contains(e) => replaceConstants(e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reorder associative integral-type operators and fold all constants into one. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.optimizer | ||
|
|
||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
| import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
| import org.apache.spark.sql.catalyst.dsl.plans._ | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.plans.PlanTest | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
|
|
||
| /** | ||
| * Unit tests for constant propagation in expressions. | ||
| */ | ||
| class ConstantPropagationSuite extends PlanTest { | ||
|
|
||
| object Optimize extends RuleExecutor[LogicalPlan] { | ||
| val batches = | ||
| Batch("AnalysisNodes", Once, | ||
| EliminateSubqueryAliases) :: | ||
| Batch("ConstantPropagation", FixedPoint(10), | ||
| ColumnPruning, | ||
| ConstantPropagation, | ||
| ConstantFolding, | ||
| BooleanSimplification) :: Nil | ||
| } | ||
|
|
||
| val testRelation = LocalRelation('a.int, 'b.int, 'c.int) | ||
|
|
||
| private val columnA = 'a.int | ||
| private val columnB = 'b.int | ||
| private val columnC = 'c.int | ||
|
|
||
| test("basic test") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where(columnA === Add(columnB, Literal(1)) && columnB === Literal(10)) | ||
|
|
||
| val correctAnswer = | ||
| testRelation | ||
| .select(columnA) | ||
| .where(columnA === Literal(11) && columnB === Literal(10)).analyze | ||
|
|
||
| comparePlans(Optimize.execute(query.analyze), correctAnswer) | ||
| } | ||
|
||
|
|
||
| test("with combination of AND and OR predicates") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Add(columnB, Literal(1)) && | ||
| columnB === Literal(10) && | ||
| (columnA === Add(columnC, Literal(3)) || columnB === columnC)) | ||
| .analyze | ||
|
|
||
| val correctAnswer = | ||
| testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(11) && | ||
| columnB === Literal(10) && | ||
| (Literal(11) === Add(columnC, Literal(3)) || Literal(10) === columnC)) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), correctAnswer) | ||
| } | ||
|
|
||
| test("equality predicates outside a `NOT` can be propagated within a `NOT`") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where(Not(columnA === Add(columnB, Literal(1))) && columnB === Literal(10)) | ||
| .analyze | ||
|
|
||
| val correctAnswer = | ||
| testRelation | ||
| .select(columnA) | ||
| .where(Not(columnA === Literal(11)) && columnB === Literal(10)) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), correctAnswer) | ||
| } | ||
|
|
||
| test("equality predicates inside a `NOT` should not be picked for propagation") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where(Not(columnB === Literal(10)) && columnA === Add(columnB, Literal(1))) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), query) | ||
| } | ||
|
|
||
| test("equality predicates outside a `OR` can be propagated within a `OR`") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(2) && | ||
| (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) | ||
| .analyze | ||
|
|
||
| val correctAnswer = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(2) && | ||
| (Literal(2) === Add(columnB, Literal(3)) || columnB === Literal(9))) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), correctAnswer) | ||
| } | ||
|
|
||
| test("equality predicates inside a `OR` should not be picked for propagation") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Add(columnB, Literal(2)) && | ||
| (columnA === Add(columnB, Literal(3)) || columnB === Literal(9))) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), query) | ||
| } | ||
|
|
||
| test("equality operator not immediate child of root `AND` should not be used for propagation") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(0) && | ||
| ((columnB === columnA) === (columnB === Literal(0)))) | ||
| .analyze | ||
|
|
||
| val correctAnswer = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(0) && | ||
| ((columnB === Literal(0)) === (columnB === Literal(0)))) | ||
| .analyze | ||
|
|
||
| comparePlans(Optimize.execute(query), correctAnswer) | ||
| } | ||
|
|
||
| test("conflicting equality predicates") { | ||
| val query = testRelation | ||
| .select(columnA) | ||
| .where( | ||
| columnA === Literal(1) && columnA === Literal(2) && columnB === Add(columnA, Literal(3))) | ||
|
|
||
| val correctAnswer = testRelation | ||
| .select(columnA) | ||
| .where(columnA === Literal(1) && columnA === Literal(2) && columnB === Literal(5)) | ||
|
|
||
| comparePlans(Optimize.execute(query.analyze), correctAnswer) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was initially doing this for the entire logical plan but now switched to do only for filter operator.
Reason: Doing this for the entire logical plan will mess up with JOIN predicates. eg.
.. the result is a cartesian product and Spark fails (asking to set a config). In case of OUTER JOINs, changing the join predicates might cause regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am being myopic here but the result should be the same right? The only way this regresses is when we plan a
CartesianProductinstead of anBroadcastNestedLoopJoin... I am fine with not optimizing this for now, it would be nice if these constraints are at least generated here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the result should be the same. I don't have any theoretical proof if doing this over joins will be safe so want to be cautious here ... any bad rules might lead to correctness bugs which is super bad for end users.
Sorry I am not able to get you here and want to make sure if I am not ignoring your comment. Are you suggesting any changes over the existing version ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently infer
is not nullconstraints up and down the plan. This could be easily extended to other constraints. Your PR has some overlap with this. However, lets focus on getting this merged first, and then we might take a stab at extending this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also cc @sameeragarwal