-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-30768][SQL] Constraints inferred from inequality attributes #27518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #118120 has finished for PR 27518 at commit
|
|
retest this please |
...talyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
Outdated
Show resolved
Hide resolved
|
Test build #118131 has finished for PR 27518 at commit
|
|
retest this please |
|
Test build #118163 has finished for PR 27518 at commit
|
|
Is the title correct? Isn't what you do here is constant propagation among constraints? BTW, I have a PR open to enhance |
|
@peter-toth I'd like to handle another case in this PR: spark.sql("CREATE TABLE `dw_user_state_history` (`id` DECIMAL(18,0), `change_time` date, `to_state` DECIMAL(9,0)) USING parquet")
spark.sql("CREATE TABLE `dw_user_cntry_hist` (`user_id` DECIMAL(18,0), `start_dt` DATE, `end_dt` DATE) USING parquet")
spark.sql(
"""
|SELECT
| count(*)
|FROM
| dw_user_state_history ush
| INNER JOIN dw_user_cntry_hist uch ON (uch.user_id = ush.id AND CAST(ush.change_time AS date) >= uch.start_dt AND CAST(ush.change_time AS date) < uch.end_dt)
|WHERE
| change_time between '2019-07-01 00:00:00' AND '2019-07-02 00:00:00'
|""".stripMargin).explain()The exepected physical plan: |
|
@wangyum, I'm a but confused now. As far as I see based on your changes, in this PR you substitute
But, I don't see how it will help with the example query here: #27518 (comment) |
|
@peter-toth 999126c helps the example query. |
|
Test build #118309 has finished for PR 27518 at commit
|
Thanks @wangyum. I see now that you basically reverted your first commit. |
|
@peter-toth Please go ahead. I reverted the first commit because this test will fail: test("Constraints shouldn't be inferred from cast equality constraint(filter lower data type)") {
val testRelation1 = LocalRelation('a.int)
val testRelation2 = LocalRelation('b.long)
val originalLeft = testRelation1.where('a === 1).subquery('left)
val originalRight = testRelation2.subquery('right)
val left = testRelation1.where(IsNotNull('a) && 'a === 1).subquery('left)
val right = testRelation2.where(IsNotNull('b)).subquery('right)
Seq(Some("left.a".attr.cast(LongType) === "right.b".attr),
Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition =>
testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition)
}
} |
# Conflicts: # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
|
Test build #118434 has finished for PR 27518 at commit
|
|
Test build #118505 has finished for PR 27518 at commit
|
|
Test build #118526 has finished for PR 27518 at commit
|
|
Metrics of Analyzer/Optimizer Rules for 02:32:26.475 WARN org.apache.spark.sql.TPCDSQuerySuite:
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 224786
Total time: 45.803692546 seconds
Rule Effective Time / Total Time Effective Runs / Total Runs
org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries 6898974687 / 8840587642 47 / 772
org.apache.spark.sql.catalyst.optimizer.ColumnPruning 671684614 / 2740648387 327 / 2364
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery 1637851386 / 1780942834 51 / 2159
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions 624455301 / 1744459696 49 / 2159
org.apache.spark.sql.catalyst.analysis.DecimalPrecision 1136163267 / 1355664883 361 / 2159
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 958843179 / 1132184862 813 / 2159
org.apache.spark.sql.catalyst.optimizer.PruneFilters 21041535 / 875480663 5 / 1978
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 7949392 / 757989983 4 / 1592
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts 301831588 / 657645402 78 / 2159
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates 374351767 / 641828516 758 / 1979
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences 14541719 / 632536654 10 / 2159
org.apache.spark.sql.catalyst.optimizer.NullPropagation 39937489 / 587603524 42 / 1592
org.apache.spark.sql.catalyst.optimizer.ReorderJoin 244728546 / 572349474 177 / 1592
org.apache.spark.sql.catalyst.optimizer.ConstantFolding 158687531 / 564225840 194 / 1592
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator 0 / 547165121 0 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals 0 / 532921745 0 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison 0 / 527530671 0 / 1592
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators 48706059 / 526075410 116 / 2364
org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps 0 / 511807420 0 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts 45722796 / 510124455 83 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions 0 / 493219669 0 / 1592
org.apache.spark.sql.catalyst.optimizer.OptimizeIn 12300150 / 491448204 27 / 1592
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability 20208595 / 484413473 12 / 674
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions 0 / 480755936 0 / 1592
org.apache.spark.sql.catalyst.optimizer.LikeSimplification 799298 / 463609379 1 / 1592
org.apache.spark.sql.catalyst.optimizer.CollapseProject 92682031 / 457556748 215 / 1978
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate 0 / 456658852 0 / 1592
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion 227217097 / 442233854 56 / 2159
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 406969136 / 435459925 278 / 386 After this PR: 02:28:49.937 WARN org.apache.spark.sql.TPCDSQuerySuite:
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 224786
Total time: 47.011460872 seconds
Rule Effective Time / Total Time Effective Runs / Total Runs
org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries 7073196527 / 8950854926 47 / 772
org.apache.spark.sql.catalyst.optimizer.ColumnPruning 724531405 / 2931435267 327 / 2364
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery 1789988207 / 1942196179 51 / 2159
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions 717122415 / 1838219499 49 / 2159
org.apache.spark.sql.catalyst.analysis.DecimalPrecision 1276704718 / 1524939842 361 / 2159
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 1003010234 / 1192920997 813 / 2159
org.apache.spark.sql.catalyst.optimizer.PruneFilters 21471500 / 952660373 5 / 1978
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 9761473 / 773205469 4 / 1592
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts 329430374 / 734067405 78 / 2159
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences 12112700 / 709719203 10 / 2159
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates 399755753 / 663934266 758 / 1979
org.apache.spark.sql.catalyst.optimizer.ReorderJoin 266383835 / 593070778 177 / 1592
org.apache.spark.sql.catalyst.optimizer.NullPropagation 35781183 / 578417823 42 / 1592
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators 48387593 / 540807035 116 / 2364
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison 0 / 535708634 0 / 1592
org.apache.spark.sql.catalyst.optimizer.ConstantFolding 114115625 / 533552246 194 / 1592
org.apache.spark.sql.catalyst.optimizer.OptimizeIn 12632788 / 529487507 27 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions 0 / 513745956 0 / 1592
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals 0 / 508586160 0 / 1592
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 459543325 / 490828096 278 / 386 |
|
Test build #118570 has finished for PR 27518 at commit
|
|
retest this please |
|
Test build #118895 has finished for PR 27518 at commit
|
|
Test build #118980 has finished for PR 27518 at commit
|
|
Test build #119516 has finished for PR 27518 at commit
|
|
Test build #120106 has finished for PR 27518 at commit
|
|
Test build #123638 has finished for PR 27518 at commit
|
|
Test build #123919 has finished for PR 27518 at commit
|
|
cc @maryannxue and @cloud-fan FYI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both this and #29650 introduce an while loop to infer constraints from a chain of existing constraints. If either of there gets accepted, the other must be changed to unify these loops.
| case _: GreaterThanOrEqual => true | ||
| case _: LessThan => true | ||
| case _: LessThanOrEqual => true | ||
| case _: EqualTo => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EqualTo should not be needed here, as the inferEqualityConstraints should cover all cases including it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inferEqualityConstraints can not handle all cases, such as constraint with cast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example: cast(a as double) > cast(b as double) and cast(b as double) = 1
| val lessThans = binaryComparisons.map { | ||
| case EqualTo(l, r) if l.foldable => EqualTo(r, l) | ||
| case GreaterThan(l, r) => LessThan(r, l) | ||
| case GreaterThanOrEqual(l, r) => LessThanOrEqual(r, l) | ||
| case other => other | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this duplicate the greaterThans block?
Here you have a < b < c and in the other block you have c > b > a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. for example:
a > b and 5 > a. we can not infer anything. but we can infer that b < 5 after rewriting a > b and 5 > a as b < a and a < 5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because of the foldable check? Without it, it should be inferable.
| var inferredConstraints = Set.empty[Expression] | ||
| greaterThans.foreach { | ||
| case op @ BinaryComparison(source: Attribute, destination: Expression) | ||
| if destination.foldable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the foldability is not needed here. The new constraints do not have to only involve constants, but also any attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid generating too many constraints. For example: a > b > c > 1. The expected inferred constraints are: a > 1 and b > 1. a > c is useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a and c are in tihe same side of a join, then it can be pushed down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to push down a > c if both a and c are not foldable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I used a wrong word. I meant pushed through the join into one of the sides.
| do { | ||
| lastInequalityInferred = inferInequalityConstraints(constraints ++ inferred) | ||
| inferred ++= lastInequalityInferred | ||
| } while (lastInequalityInferred.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you hit a infinite loop with non deterministic filters? As they are never semantically equal to any other expression (including themselves). I hit that problem in #29650, where I was also working on constraint inference , but from EqualNullSafe.
|
Thank you all. Merged it to our internal Spark version. |
What changes were proposed in this pull request?
In our production environment, there are many queries similar to this pattern:
In this case, we can infer more constraints to improve query performance. E.g.
spark_30768_2.start_dt <= '2020-02-07'inferred fromspark_30768_2.start_dt <= spark_30768_1.src_cre_dt <= '2020-02-07'andspark_30768_2.end_dt > '2020-02-01'inferred fromspark_30768_2.end_dt > spark_30768_1.src_cre_dt >= '2020-02-01'. This PR add support infer these constraints from inequality attributes.Why are the changes needed?
Improve query performance. Teradata support this optimization:
https://docs.teradata.com/reader/Ws7YT1jvRK2vEr1LpVURug/V~FCwD9BL7gY4ac3WwHInw?section=xcg1472241575102__application_of_transitive_closure_section
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test.
Benchmark code and benchmark result:
Before this PR:
After this PR:
Also test this feature in our production environment, it can significantly improve the query performance of at least 6 SQLs (a total of 200 SQLs):
For e.g. SQL 372. It prevents (18,413,424,580 - 162,205,133 = 18,251,219,447) rows from participating in shuffle:

Before this PR:
After this PR:
