From 840ff97e0a7b5310840f96aa5e42de2da813db8c Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 8 Sep 2020 18:36:29 +0900 Subject: [PATCH 1/6] Changed EnsureRequirements to let it removes redundant shuffle exchanges. --- .../exchange/EnsureRequirements.scala | 3 +- .../spark/sql/execution/PlannerSuite.scala | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b176598ed8c2c..b197d0f618694 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -52,7 +52,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) - ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) + val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child + ShuffleExchangeExec(distribution.createPartitioning(numPartitions), newChild) } // Get the indexes of children which have specified distribution requirements and need to have diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index d428b7ebc0e91..88ed3cd64e102 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -994,6 +994,39 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } } + + test("Remove redundant shuffle exchange") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") { + val ordered = spark.range(1, 100).repartitionByRange(10, $"id".desc).orderBy($"id") + val orderedPlan = ordered.queryExecution.executedPlan + val exchangesInOrdered = + orderedPlan.collect { case s: ShuffleExchangeExec => s} + assert(exchangesInOrdered.size == 1) + + val partitioning = exchangesInOrdered.head.outputPartitioning + assert(partitioning.numPartitions == 200) + assert(partitioning.isInstanceOf[RangePartitioning]) + + val left = Seq(1, 2, 3).toDF.repartition(10, $"value") + val right = Seq(1, 2, 3).toDF + val joined = left.join(right, left("value") + 1 === right("value")) + val joinedPlan = joined.queryExecution.executedPlan + val exchangesInJoined = + joinedPlan.collect { case s: ShuffleExchangeExec => s} + + assert(exchangesInJoined.size == 2) + + val leftPartitioning = exchangesInJoined(0).outputPartitioning + assert(leftPartitioning.numPartitions == 200) + assert(leftPartitioning.isInstanceOf[HashPartitioning]) + + val rightPartitioning = exchangesInJoined(1).outputPartitioning + assert(rightPartitioning.numPartitions == 200) + assert(rightPartitioning.isInstanceOf[HashPartitioning]) + } + } + } } // Used for unit-testing EnsureRequirements From 1effe7547c34246bd99d7cda5c100ff75875934a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 8 Sep 2020 18:38:22 +0900 Subject: [PATCH 2/6] Removed a blank line. --- .../test/scala/org/apache/spark/sql/execution/PlannerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 88ed3cd64e102..574b04da921ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1014,7 +1014,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { val joinedPlan = joined.queryExecution.executedPlan val exchangesInJoined = joinedPlan.collect { case s: ShuffleExchangeExec => s} - assert(exchangesInJoined.size == 2) val leftPartitioning = exchangesInJoined(0).outputPartitioning From 0da9e8270e3209cefc430788c481005804857727 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 8 Sep 2020 22:04:03 +0900 Subject: [PATCH 3/6] Added comments. --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b197d0f618694..4da2983c1d32b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -52,6 +52,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) + // Like optimizer.CollapseRepartition removes adjacent repartition operations, + // adjacent repartitions performed by shuffle can be also removed. val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child ShuffleExchangeExec(distribution.createPartitioning(numPartitions), newChild) } From 5680d48c3359437daf6b6bc46962cf16d180e97c Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 11 Sep 2020 13:51:39 +0900 Subject: [PATCH 4/6] Added JIRA number prefix to the test. --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2613c28dd889a..488bfe70c03a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1002,7 +1002,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(numPartitions == 0) } - test("Remove redundant shuffle exchange") { + test("SPARK-32820: Remove redundant shuffle exchange") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") { val ordered = spark.range(1, 100).repartitionByRange(10, $"id".desc).orderBy($"id") From 23c8b708dd5fcd21bb111d7ef43b78165f5d9166 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 1 Oct 2020 11:57:55 +0900 Subject: [PATCH 5/6] Added PruneShuffle rule to prune unnecessary shuffles. --- .../spark/sql/execution/QueryExecution.scala | 3 +- .../exchange/EnsureRequirements.scala | 5 +- .../sql/execution/exchange/PruneShuffle.scala | 43 +++++++++++++++ .../spark/sql/execution/PlannerSuite.scala | 52 +++++++++++-------- 4 files changed, 77 insertions(+), 26 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/PruneShuffle.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index dca2c5b16e8d5..0eb063a876bd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, PruneShuffle, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode @@ -344,6 +344,7 @@ object QueryExecution { PlanSubqueries(sparkSession), RemoveRedundantProjects(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), + PruneShuffle(), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 4da2983c1d32b..b176598ed8c2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -52,10 +52,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) - // Like optimizer.CollapseRepartition removes adjacent repartition operations, - // adjacent repartitions performed by shuffle can be also removed. - val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child - ShuffleExchangeExec(distribution.createPartitioning(numPartitions), newChild) + ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) } // Get the indexes of children which have specified distribution requirements and need to have diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/PruneShuffle.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/PruneShuffle.scala new file mode 100644 index 0000000000000..06bfcd155a14e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/PruneShuffle.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.exchange + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan + +/** + * Removes unnecessary shuffles. A shuffle can be introduced by [[Rule]]s for + * [[SparkPlan]]s, such as [[EnsureRequirements]] and then, its immediate child of + * another shuffle should be unnecessary. + */ +case class PruneShuffle() extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = plan.transform { + case op @ ShuffleExchangeExec(_, child: ShuffleExchangeExec, _) => + op.withNewChildren(Seq(pruneShuffle(child))) + case other => other + } + + private def pruneShuffle(plan: SparkPlan): SparkPlan = { + plan match { + case shuffle: ShuffleExchangeExec => + pruneShuffle(shuffle.child) + case other => other + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 488bfe70c03a9..7350a6980168e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, Disable import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledJoin, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -1006,30 +1006,40 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") { val ordered = spark.range(1, 100).repartitionByRange(10, $"id".desc).orderBy($"id") - val orderedPlan = ordered.queryExecution.executedPlan - val exchangesInOrdered = - orderedPlan.collect { case s: ShuffleExchangeExec => s } - assert(exchangesInOrdered.size == 1) - - val partitioning = exchangesInOrdered.head.outputPartitioning + val orderedPlan = ordered.queryExecution.executedPlan.collectFirst { + case sort: SortExec => sort + }.get + val exchangeInOrdered = orderedPlan.collectFirst { + case shuffle: ShuffleExchangeExec => shuffle + }.get + + val partitioning = exchangeInOrdered.outputPartitioning assert(partitioning.numPartitions == 200) - assert(partitioning.isInstanceOf[RangePartitioning]) - - val left = Seq(1, 2, 3).toDF.repartition(10, $"value") - val right = Seq(1, 2, 3).toDF - val joined = left.join(right, left("value") + 1 === right("value")) - val joinedPlan = joined.queryExecution.executedPlan - val exchangesInJoined = - joinedPlan.collect { case s: ShuffleExchangeExec => s } - assert(exchangesInJoined.size == 2) - - val leftPartitioning = exchangesInJoined(0).outputPartitioning + assert(partitioning.satisfies(orderedPlan.requiredChildDistribution.head)) + + val left = Seq(1, 2, 3).toDF.repartition(10) + val right = Seq(1, 2, 3).toDF.repartition(30, $"value") + val joined = left.join(right, left("value") + 1 === right("value") + 2) + val joinedPlan = joined.queryExecution.executedPlan.collectFirst { + case shuffledJoin: ShuffledJoin => shuffledJoin + }.get + val leftExchangesInJoined = joinedPlan.children(0).collectFirst { + case shuffle: ShuffleExchangeExec => shuffle + }.get + val rightExchangeInJoined = joinedPlan.children(1).collectFirst { + case shuffle: ShuffleExchangeExec => shuffle + }.get + + assert(!leftExchangesInJoined.children.head.isInstanceOf[ShuffleExchangeExec]) + assert(!rightExchangeInJoined.children.head.isInstanceOf[ShuffleExchangeExec]) + + val leftPartitioning = leftExchangesInJoined.outputPartitioning assert(leftPartitioning.numPartitions == 200) - assert(leftPartitioning.isInstanceOf[HashPartitioning]) + assert(leftPartitioning.satisfies(joinedPlan.requiredChildDistribution(0))) - val rightPartitioning = exchangesInJoined(1).outputPartitioning + val rightPartitioning = rightExchangeInJoined.outputPartitioning assert(rightPartitioning.numPartitions == 200) - assert(rightPartitioning.isInstanceOf[HashPartitioning]) + assert(rightPartitioning.satisfies(joinedPlan.requiredChildDistribution(1))) } } } From e699cb61389c282d120bdc8a64d694010d123906 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 1 Oct 2020 12:00:16 +0900 Subject: [PATCH 6/6] Removed unnecessary assertions. --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7350a6980168e..9361e35a8ff1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1030,9 +1030,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { case shuffle: ShuffleExchangeExec => shuffle }.get - assert(!leftExchangesInJoined.children.head.isInstanceOf[ShuffleExchangeExec]) - assert(!rightExchangeInJoined.children.head.isInstanceOf[ShuffleExchangeExec]) - val leftPartitioning = leftExchangesInJoined.outputPartitioning assert(leftPartitioning.numPartitions == 200) assert(leftPartitioning.satisfies(joinedPlan.requiredChildDistribution(0)))