Skip to content

Commit 0aca1a6

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-32276][SQL] Remove redundant sorts before repartition nodes
### What changes were proposed in this pull request? This PR proposes to remove redundant sorts before repartition nodes whenever the data is ordered after the repartitioning. ### Why are the changes needed? It looks like our `EliminateSorts` rule can be extended further to remove sorts before repartition nodes that don't affect the final output ordering. It seems safe to perform the following rewrites: - `Sort -> Repartition -> Sort -> Scan` as `Sort -> Repartition -> Scan` - `Sort -> Repartition -> Project -> Sort -> Scan` as `Sort -> Repartition -> Project -> Scan` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? More test cases. Closes #29089 from aokolnychyi/spark-32276. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 026b0b9 commit 0aca1a6

File tree

2 files changed

+206
-6
lines changed

2 files changed

+206
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -966,14 +966,19 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
966966
}
967967

968968
/**
969-
* Removes Sort operation. This can happen:
969+
* Removes Sort operations if they don't affect the final output ordering.
970+
* Note that changes in the final output ordering may affect the file size (SPARK-32318).
971+
* This rule handles the following cases:
970972
* 1) if the sort order is empty or the sort order does not have any reference
971973
* 2) if the child is already sorted
972-
* 3) if there is another Sort operator separated by 0...n Project/Filter operators
973-
* 4) if the Sort operator is within Join separated by 0...n Project/Filter operators only,
974-
* and the Join conditions is deterministic
975-
* 5) if the Sort operator is within GroupBy separated by 0...n Project/Filter operators only,
976-
* and the aggregate function is order irrelevant
974+
* 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or
975+
* RepartitionByExpression (with deterministic expressions) operators
976+
* 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or
977+
* RepartitionByExpression (with deterministic expressions) operators only and the Join condition
978+
* is deterministic
979+
* 5) if the Sort operator is within GroupBy separated by 0...n Project, Filter, Repartition or
980+
* RepartitionByExpression (with deterministic expressions) operators only and the aggregate
981+
* function is order irrelevant
977982
*/
978983
object EliminateSorts extends Rule[LogicalPlan] {
979984
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -999,6 +1004,8 @@ object EliminateSorts extends Rule[LogicalPlan] {
9991004
private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
10001005
case p: Project => p.projectList.forall(_.deterministic)
10011006
case f: Filter => f.condition.deterministic
1007+
case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
1008+
case _: Repartition => true
10021009
case _ => false
10031010
}
10041011

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
21+
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
22+
import org.apache.spark.sql.catalyst.dsl.expressions._
23+
import org.apache.spark.sql.catalyst.dsl.plans._
24+
import org.apache.spark.sql.catalyst.plans.PlanTest
25+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
26+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
28+
class EliminateSortsBeforeRepartitionSuite extends PlanTest {
29+
30+
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
31+
val analyzer = new Analyzer(catalog, conf)
32+
33+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
34+
val anotherTestRelation = LocalRelation('d.int, 'e.int)
35+
36+
object Optimize extends RuleExecutor[LogicalPlan] {
37+
val batches =
38+
Batch("Default", FixedPoint(10),
39+
FoldablePropagation,
40+
LimitPushDown) ::
41+
Batch("Eliminate Sorts", Once,
42+
EliminateSorts) ::
43+
Batch("Collapse Project", Once,
44+
CollapseProject) :: Nil
45+
}
46+
47+
def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10)
48+
49+
test("sortBy") {
50+
val plan = testRelation.select('a, 'b).sortBy('a.asc, 'b.desc)
51+
val optimizedPlan = testRelation.select('a, 'b)
52+
checkRepartitionCases(plan, optimizedPlan)
53+
}
54+
55+
test("sortBy with projection") {
56+
val plan = testRelation.sortBy('a.asc, 'b.asc).select('a + 1 as "a", 'b + 2 as "b")
57+
val optimizedPlan = testRelation.select('a + 1 as "a", 'b + 2 as "b")
58+
checkRepartitionCases(plan, optimizedPlan)
59+
}
60+
61+
test("sortBy with projection and filter") {
62+
val plan = testRelation.sortBy('a.asc, 'b.asc).select('a, 'b).where('a === 10)
63+
val optimizedPlan = testRelation.select('a, 'b).where('a === 10)
64+
checkRepartitionCases(plan, optimizedPlan)
65+
}
66+
67+
test("sortBy with limit") {
68+
val plan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
69+
val optimizedPlan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
70+
checkRepartitionCases(plan, optimizedPlan)
71+
}
72+
73+
test("sortBy with non-deterministic projection") {
74+
val plan = testRelation.sortBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
75+
val optimizedPlan = testRelation.sortBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
76+
checkRepartitionCases(plan, optimizedPlan)
77+
}
78+
79+
test("orderBy") {
80+
val plan = testRelation.select('a, 'b).orderBy('a.asc, 'b.asc)
81+
val optimizedPlan = testRelation.select('a, 'b)
82+
checkRepartitionCases(plan, optimizedPlan)
83+
}
84+
85+
test("orderBy with projection") {
86+
val plan = testRelation.orderBy('a.asc, 'b.asc).select('a + 1 as "a", 'b + 2 as "b")
87+
val optimizedPlan = testRelation.select('a + 1 as "a", 'b + 2 as "b")
88+
checkRepartitionCases(plan, optimizedPlan)
89+
}
90+
91+
test("orderBy with projection and filter") {
92+
val plan = testRelation.orderBy('a.asc, 'b.asc).select('a, 'b).where('a === 10)
93+
val optimizedPlan = testRelation.select('a, 'b).where('a === 10)
94+
checkRepartitionCases(plan, optimizedPlan)
95+
}
96+
97+
test("orderBy with limit") {
98+
val plan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
99+
val optimizedPlan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
100+
checkRepartitionCases(plan, optimizedPlan)
101+
}
102+
103+
test("orderBy with non-deterministic projection") {
104+
val plan = testRelation.orderBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
105+
val optimizedPlan = testRelation.orderBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
106+
checkRepartitionCases(plan, optimizedPlan)
107+
}
108+
109+
test("additional coalesce and sortBy") {
110+
val plan = testRelation.sortBy('a.asc, 'b.asc).coalesce(1)
111+
val optimizedPlan = testRelation.coalesce(1)
112+
checkRepartitionCases(plan, optimizedPlan)
113+
}
114+
115+
test("additional projection, repartition and sortBy") {
116+
val plan = testRelation.sortBy('a.asc, 'b.asc).repartition(100).select('a + 1 as "a")
117+
val optimizedPlan = testRelation.repartition(100).select('a + 1 as "a")
118+
checkRepartitionCases(plan, optimizedPlan)
119+
}
120+
121+
test("additional filter, distribute and sortBy") {
122+
val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a === 10)
123+
val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
124+
checkRepartitionCases(plan, optimizedPlan)
125+
}
126+
127+
test("join") {
128+
val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a === 10)
129+
val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
130+
val anotherPlan = anotherTestRelation.select('d)
131+
val joinPlan = plan.join(anotherPlan)
132+
val optimizedJoinPlan = optimize(joinPlan)
133+
val correctJoinPlan = analyze(optimizedPlan.join(anotherPlan))
134+
comparePlans(optimizedJoinPlan, correctJoinPlan)
135+
}
136+
137+
test("aggregate") {
138+
val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a === 10)
139+
val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
140+
val aggPlan = plan.groupBy('a)(sum('b))
141+
val optimizedAggPlan = optimize(aggPlan)
142+
val correctAggPlan = analyze(optimizedPlan.groupBy('a)(sum('b)))
143+
comparePlans(optimizedAggPlan, correctAggPlan)
144+
}
145+
146+
protected def checkRepartitionCases(plan: LogicalPlan, optimizedPlan: LogicalPlan): Unit = {
147+
// cannot remove sortBy before repartition without sortBy/orderBy
148+
val planWithRepartition = repartition(plan)
149+
val optimizedPlanWithRepartition = optimize(planWithRepartition)
150+
val correctPlanWithRepartition = analyze(planWithRepartition)
151+
comparePlans(optimizedPlanWithRepartition, correctPlanWithRepartition)
152+
153+
// can remove sortBy before repartition with sortBy
154+
val planWithRepartitionAndSortBy = planWithRepartition.sortBy('a.asc)
155+
val optimizedPlanWithRepartitionAndSortBy = optimize(planWithRepartitionAndSortBy)
156+
val correctPlanWithRepartitionAndSortBy = analyze(repartition(optimizedPlan).sortBy('a.asc))
157+
comparePlans(optimizedPlanWithRepartitionAndSortBy, correctPlanWithRepartitionAndSortBy)
158+
159+
// can remove sortBy before repartition with orderBy
160+
val planWithRepartitionAndOrderBy = planWithRepartition.orderBy('a.asc)
161+
val optimizedPlanWithRepartitionAndOrderBy = optimize(planWithRepartitionAndOrderBy)
162+
val correctPlanWithRepartitionAndOrderBy = analyze(repartition(optimizedPlan).orderBy('a.asc))
163+
comparePlans(optimizedPlanWithRepartitionAndOrderBy, correctPlanWithRepartitionAndOrderBy)
164+
}
165+
166+
private def analyze(plan: LogicalPlan): LogicalPlan = {
167+
analyzer.execute(plan)
168+
}
169+
170+
private def optimize(plan: LogicalPlan): LogicalPlan = {
171+
Optimize.execute(analyzer.execute(plan))
172+
}
173+
}
174+
175+
class EliminateSortsBeforeRepartitionByExprsSuite extends EliminateSortsBeforeRepartitionSuite {
176+
override def repartition(plan: LogicalPlan): LogicalPlan = plan.distribute('a)(10)
177+
178+
test("sortBy before repartition with non-deterministic expressions") {
179+
val plan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
180+
val planWithRepartition = plan.distribute(rand(1).asc, 'a.asc)(20)
181+
checkRepartitionCases(plan = planWithRepartition, optimizedPlan = planWithRepartition)
182+
}
183+
184+
test("orderBy before repartition with non-deterministic expressions") {
185+
val plan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
186+
val planWithRepartition = plan.distribute(rand(1).asc, 'a.asc)(20)
187+
checkRepartitionCases(plan = planWithRepartition, optimizedPlan = planWithRepartition)
188+
}
189+
}
190+
191+
class EliminateSortsBeforeCoalesceSuite extends EliminateSortsBeforeRepartitionSuite {
192+
override def repartition(plan: LogicalPlan): LogicalPlan = plan.coalesce(1)
193+
}

0 commit comments

Comments
 (0)