-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-33152][SQL] Improve the performance of constraint propagation for Project and Aggregate #30894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133242 has finished for PR 30894 at commit
|
|
Test build #133240 has finished for PR 30894 at commit
|
|
Test build #133262 has finished for PR 30894 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133318 has finished for PR 30894 at commit
|
|
cc @maropu , @HyukjinKwon |
|
cc @gengliangwang too |
| } | ||
|
|
||
| /** | ||
| We keep the child constraints and equality between original and aliased attributes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since allConstraints is initially assigned as child.constraints, why do we need to keep the child constraints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we might have filtered out some of them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you show an example for why we should keep child.constraints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example this test would fail:
test("SPARK-33152: infer from child constraint") {
val plan = LocalRelation('a.int, 'b.int)
.where('a === 'b)
.select('a, ('b + 1) as 'b2)
.analyze
verifyConstraints(plan.constraints, ExpressionSet(Seq(
IsNotNull(resolveColumn(plan, "a")),
resolveColumn(plan, "a") + 1 <=> resolveColumn(plan, "b2")
)))
}
To infer a + 1 <=> b2, it would need the child constraints.
| so [[ConstraintHelper.inferAdditionalConstraints]] would have the full information available. | ||
| */ | ||
| projectList.foreach { | ||
| case alias @ Alias(expr, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we just need to handle a @ Alias(l: Literal, _) here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be right, that only the literal aliases are used currently, but all aliases were kept in the previous code (lines 180 & 187) and when somebody wants to improve inferAdditionalConstraints, then they might need these.
Removing non-literal aliases would be marginal performance improvement and I would rather keep the existing behavior.
|
|
||
| // For each expression collect its aliases | ||
| val aliasMap = projectList.collect{ | ||
| case alias @ Alias(expr, _) if !expr.foldable => (expr.canonicalized, alias) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to filter the non-deterministic expressions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are correct. I'll add this as a safety feature.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133626 has finished for PR 30894 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133649 has finished for PR 30894 at commit
|
|
Checked the 2 git action errors:
Look forward to see this PR get merged. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136771 has finished for PR 30894 at commit
|
|
@cloud-fan, there have been some reviews allready, but perhaps you could also take a look. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
Hi @tanelk @HyukjinKwon @maropu any updates for this pr, we've also met this issue for many spark jobs. |
What changes were proposed in this pull request?
To improve performance, changed the
UnaryNode.getAllValidConstraintsto discard constraints, that will not change the end result.Why are the changes needed?
There has been at least two attempts at speeding up the constraint system: #30185 and #26257. Both of them seem to have stalled.
Optimizing project and aggregate nodes could have exponential memory and time complexity in relation to the number of aliases they have. Most simple example would be a
Project, that has incoming columnsa1,a2,...,anand a child constrainta1 + a2 + ... + an > 0. If it would alias its columnsa1asb1,a2asb2,...,anasbn, thenUnaryNode.getAllValidConstraintswould return 2 to the power ofnconstraints (plus someisNotNullconstraints) - each column is replaced by its alias in half of the constraints. All except one of these constraints would get filtered out later on - the one where all aliases are replaced is kept. Eagerly filtering these out will improve the performance and avoids possible OOM.Does this PR introduce any user-facing change?
No
How was this patch tested?
New and existing UTs.
Manually verified the performance gain by an example provided in #30185 :
The optimization time for this was reduced from 25s to 0.2s.