diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c1b885a72ad3e..a8b632d4e74b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -282,14 +282,20 @@ object SQLConf { val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.minNumPostShufflePartitions") - .internal() - .doc("The advisory minimal number of post-shuffle partitions provided to " + - "ExchangeCoordinator. This setting is used in our test to make sure we " + - "have enough parallelism to expose issues that will not be exposed with a " + - "single partition. When the value is a non-positive value, this setting will " + - "not be provided to ExchangeCoordinator.") + .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") + .intConf + .checkValue(_ > 0, "The minimum shuffle partition number " + + "must be a positive integer.") + .createWithDefault(1) + + val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = + buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") + .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " + + "The by default equals to spark.sql.shuffle.partitions") .intConf - .createWithDefault(-1) + .checkValue(_ > 0, "The maximum shuffle partition number " + + "must be a positive integer.") + .createOptional val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") @@ -1728,8 +1734,10 @@ class SQLConf extends Serializable with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) - def minNumPostShufflePartitions: Int = - getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + + def maxNumPostShufflePartitions: Int = + getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 72499aa936a56..37e1e54d8766b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -99,10 +100,13 @@ class QueryExecution( /** A sequence of rules that will be applied in order to the physical plan before execution. */ protected def preparations: Seq[Rule[SparkPlan]] = Seq( PlanSubqueries(sparkSession), + ReuseSubquery(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), + // `AdaptiveSparkPlan` is a leaf node. If inserted, all the following rules will be no-op as + // the original plan is hidden behind `AdaptiveSparkPlan`. + InsertAdaptiveSparkPlan(sparkSession), CollapseCodegenStages(sparkSession.sessionState.conf), - ReuseExchange(sparkSession.sessionState.conf), - ReuseSubquery(sparkSession.sessionState.conf)) + ReuseExchange(sparkSession.sessionState.conf)) def simpleString: String = withRedaction { val concat = new StringConcat() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index f554ff0aa775f..f59eba6a51fc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -52,6 +53,8 @@ private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil + case a: AdaptiveSparkPlanExec => a.finalPlan :: Nil + case stage: QueryStageExec => stage.plan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala new file mode 100644 index 0000000000000..ae6290782fe31 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import java.util.concurrent.CountDownLatch + +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate + +/** + * A root node to execute the query plan adaptively. It splits the query plan into independent + * stages and executes them in order according to their dependencies. The query stage + * materializes its output at the end. When one stage completes, the data statistics of its + * materialized output will be used to optimize the subsequent stages. + * This is called mid-query re-optimization in database literature. + */ +case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession) + extends LeafExecNode{ + + override def output: Seq[Attribute] = initialPlan.output + + @volatile private var currentPlan: SparkPlan = initialPlan + @volatile private var error: Throwable = null + + // We will release the lock when all the query stages are completed, or we fail to + // optimize/execute query stages. Getting `finalPlan` will be blocked until the lock is release. + // This is better than wait()/notify(), as we can easily check if the computation has completed, + // by calling `readyLock.getCount()`. + private val readyLock = new CountDownLatch(1) + + private def createCallback(executionId: Option[Long]) = new QueryStageManagerCallback { + override def onPlanUpdate(updatedPlan: SparkPlan): Unit = { + updateCurrentPlan(updatedPlan, executionId) + } + + override def onFinalPlan(finalPlan: SparkPlan): Unit = { + updateCurrentPlan(finalPlan, executionId) + readyLock.countDown() + } + + override def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit = { + error = new SparkException( + s""" + |Fail to materialize query stage ${stage.id}: + |${stage.plan.treeString} + """.stripMargin, e) + readyLock.countDown() + } + + override def onError(e: Throwable): Unit = { + error = e + readyLock.countDown() + } + } + + private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = { + currentPlan = newPlan + executionId.foreach { id => + session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( + id, + SQLExecution.getQueryExecution(id).toString, + SparkPlanInfo.fromSparkPlan(currentPlan))) + } + } + + def finalPlan: SparkPlan = { + if (readyLock.getCount > 0) { + val sc = session.sparkContext + val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong) + val stageManager = new QueryStageManager(initialPlan, session, createCallback(executionId)) + stageManager.start() + readyLock.await() + stageManager.stop() + } + + if (error != null) throw error + currentPlan + } + + override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect() + override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n) + override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator() + override def doExecute(): RDD[InternalRow] = finalPlan.execute() + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int): Unit = { + currentPlan.generateTreeString( + depth, lastChildren, append, verbose, "", false, maxFields) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala new file mode 100644 index 0000000000000..5ab23962f4c6e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.ExecutedCommandExec + +/** + * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which executes the query plan + * adaptively with runtime data statistics. Note that this rule must be run after + * [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are + * already inserted. + */ +case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = plan match { + case _: ExecutedCommandExec => plan + case _ if session.sessionState.conf.adaptiveExecutionEnabled => + AdaptiveSparkPlanExec(plan, session.cloneSession()) + case _ => plan + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala new file mode 100644 index 0000000000000..e3a7165fa37a0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.concurrent.Future + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange._ + +/** + * A query stage is an independent subgraph of the query plan. Query stage materializes its output + * before proceeding with further operators of the query plan. The data statistics of the + * materialized output can be used to optimize subsequent query stages. + * + * There are 2 kinds of query stages: + * 1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches + * another job to execute the further operators. + * 2. Broadcast query stage. This stage materializes its output to an array in driver JVM. Spark + * broadcasts the array before executing the further operators. + */ +abstract class QueryStageExec extends LeafExecNode { + + /** + * An id of this query stage which is unique in the entire query plan. + */ + def id: Int + + /** + * The sub-tree of the query plan that belongs to this query stage. + */ + def plan: SparkPlan + + /** + * Returns a new query stage with a new plan, which is optimized based on accurate runtime data + * statistics. + */ + def withNewPlan(newPlan: SparkPlan): QueryStageExec + + /** + * Materialize this query stage, to prepare for the execution, like submitting map stages, + * broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this + * stage is ready. + */ + def materialize(): Future[Any] + + override def output: Seq[Attribute] = plan.output + override def outputPartitioning: Partitioning = plan.outputPartitioning + override def outputOrdering: Seq[SortOrder] = plan.outputOrdering + override def executeCollect(): Array[InternalRow] = plan.executeCollect() + override def executeTake(n: Int): Array[InternalRow] = plan.executeTake(n) + override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator() + override def doExecute(): RDD[InternalRow] = plan.execute() + override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast() + override def doCanonicalize(): SparkPlan = plan.canonicalized + + // TODO: maybe we should not hide query stage entirely from explain result. + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int): Unit = { + plan.generateTreeString( + depth, lastChildren, append, verbose, "", false, maxFields) + } +} + +/** + * A shuffle query stage whose child is a [[ShuffleExchangeExec]]. + */ +case class ShuffleQueryStageExec(id: Int, plan: ShuffleExchangeExec) extends QueryStageExec { + + override def withNewPlan(newPlan: SparkPlan): QueryStageExec = { + copy(plan = newPlan.asInstanceOf[ShuffleExchangeExec]) + } + + @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { + if (plan.inputRDD.getNumPartitions == 0) { + // `submitMapStage` does not accept RDD with 0 partition. Here we return null and the caller + // side should take care of it. + Future.successful(null) + } else { + sparkContext.submitMapStage(plan.shuffleDependency) + } + } + + override def materialize(): Future[Any] = { + mapOutputStatisticsFuture + } +} + +/** + * A broadcast query stage whose child is a [[BroadcastExchangeExec]]. + */ +case class BroadcastQueryStageExec(id: Int, plan: BroadcastExchangeExec) extends QueryStageExec { + + override def withNewPlan(newPlan: SparkPlan): QueryStageExec = { + copy(plan = newPlan.asInstanceOf[BroadcastExchangeExec]) + } + + override def materialize(): Future[Any] = { + plan.relationFuture + } +} + +/** + * A wrapper of query stage to indicate that it's reused. Note that itself is not a query stage. + */ +case class ReusedQueryStageExec(child: SparkPlan, output: Seq[Attribute]) + extends UnaryExecNode { + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def doExecute(): RDD[InternalRow] = { + child.execute() + } + + override def doExecuteBroadcast[T](): Broadcast[T] = { + child.executeBroadcast() + } + + // `ReusedQueryStageExec` can have distinct set of output attribute ids from its child, we need + // to update the attribute ids in `outputPartitioning` and `outputOrdering`. + private lazy val updateAttr: Expression => Expression = { + val originalAttrToNewAttr = AttributeMap(child.output.zip(output)) + e => e.transform { + case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr) + } + } + + override def outputPartitioning: Partitioning = child.outputPartitioning match { + case e: Expression => updateAttr(e).asInstanceOf[Partitioning] + case other => other + } + + override def outputOrdering: Seq[SortOrder] = { + child.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder]) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala new file mode 100644 index 0000000000000..0c4f093869a81 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{CollapseCodegenStages, SparkPlan} +import org.apache.spark.sql.execution.adaptive.rule._ +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{EventLoop, ThreadUtils} + +/** + * This class inserts [[QueryStageExec]] into the query plan in a bottom-up fashion, and + * materializes the query stages asynchronously as soon as they are created. + * + * When one query stage finishes materialization, a list of adaptive optimizer rules will be + * executed, trying to optimize the query plan with the data statistics collected from the the + * materialized data. Then we traverse the query plan again and try to insert more query stages. + * + * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, + * and all the child query stages of this exchange node are materialized, we create a new + * query stage for this exchange node. + * + * Right before the stage creation, a list of query stage optimizer rules will be executed. These + * optimizer rules are different from the adaptive optimizer rules. Query stage optimizer rules only + * focus on a plan sub-tree of a specific query stage, and they will be executed only after all the + * child stages are materialized. + */ +class QueryStageManager( + initialPlan: SparkPlan, + session: SparkSession, + callback: QueryStageManagerCallback) + extends EventLoop[QueryStageManagerEvent]("QueryStageCreator") { + + private def conf = session.sessionState.conf + + private val readyStages = mutable.HashSet.empty[Int] + + private var currentStageId = 0 + + private val stageCache = + mutable.HashMap.empty[StructType, mutable.Buffer[(Exchange, QueryStageExec)]] + + private var currentPlan = initialPlan + + private val localProperties = session.sparkContext.getLocalProperties + + private implicit def executionContext: ExecutionContextExecutorService = { + QueryStageManager.executionContext + } + + // A list of optimizer rules that will be applied when a query stage finishes materialization. + // These rules need to travers the entire query plan, and find chances to optimize the query plan + // with the data statistics collected from materialized query stage's output. + private val adaptiveOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + RemoveRedundantShuffles) + + // A list of optimizer rules that will be applied right before a query stage is created. + // These rules need to traverse the plan sub-tree of the query stage to be created, and find + // chances to optimize this query stage given the all its child query stages. + private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + AssertChildStagesMaterialized, + ReduceNumShufflePartitions(conf), + CollapseCodegenStages(conf)) + + private def optimizeEntirePlan(plan: SparkPlan): SparkPlan = { + adaptiveOptimizerRules.foldLeft(plan) { + case (current, rule) => rule(current) + } + } + + private def optimizeQueryStage(plan: SparkPlan): SparkPlan = { + queryStageOptimizerRules.foldLeft(plan) { + case (current, rule) => rule(current) + } + } + + override protected def onReceive(event: QueryStageManagerEvent): Unit = event match { + case Start => + // set active session and local properties for the event loop thread. + SparkSession.setActiveSession(session) + session.sparkContext.setLocalProperties(localProperties) + currentPlan = createQueryStages(initialPlan) + + case MaterializeStage(stage) => + stage.materialize().onComplete { res => + if (res.isSuccess) { + post(StageReady(stage)) + } else { + callback.onStageMaterializationFailed(stage, res.failed.get) + stop() + } + } + + case StageReady(stage) => + readyStages += stage.id + currentPlan = optimizeEntirePlan(currentPlan) + currentPlan = createQueryStages(currentPlan) + } + + override protected def onStart(): Unit = { + post(Start) + } + + /** + * Traverse the query plan bottom-up, and creates query stages as many as possible. + */ + private def createQueryStages(plan: SparkPlan): SparkPlan = { + val result = createQueryStages0(plan) + if (result.allChildStagesReady) { + val finalPlan = optimizeQueryStage(result.newPlan) + callback.onFinalPlan(finalPlan) + finalPlan + } else { + callback.onPlanUpdate(result.newPlan) + result.newPlan + } + } + + /** + * This method is called recursively to traverse the plan tree bottom-up. This method returns two + * information: 1) the new plan after we insert query stages. 2) whether or not the child query + * stages of the new plan are all ready. + * + * if the current plan is an exchange node, and all its child query stages are ready, we create + * a new query stage. + */ + private def createQueryStages0(plan: SparkPlan): CreateStageResult = plan match { + case e: Exchange => + val similarStages = stageCache.getOrElseUpdate(e.schema, mutable.Buffer.empty) + similarStages.find(_._1.sameResult(e)) match { + case Some((_, existingStage)) if conf.exchangeReuseEnabled => + CreateStageResult( + newPlan = ReusedQueryStageExec(existingStage, e.output), + allChildStagesReady = readyStages.contains(existingStage.id)) + + case _ => + val result = createQueryStages0(e.child) + val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange] + // Create a query stage only when all the child query stages are ready. + if (result.allChildStagesReady) { + val queryStage = createQueryStage(newPlan) + similarStages.append(e -> queryStage) + // We've created a new stage, which is obviously not ready yet. + CreateStageResult(newPlan = queryStage, allChildStagesReady = false) + } else { + CreateStageResult(newPlan = newPlan, allChildStagesReady = false) + } + } + + case q: QueryStageExec => + CreateStageResult(newPlan = q, allChildStagesReady = readyStages.contains(q.id)) + + case _ => + if (plan.children.isEmpty) { + CreateStageResult(newPlan = plan, allChildStagesReady = true) + } else { + val results = plan.children.map(createQueryStages0) + CreateStageResult( + newPlan = plan.withNewChildren(results.map(_.newPlan)), + allChildStagesReady = results.forall(_.allChildStagesReady)) + } + } + + private def createQueryStage(e: Exchange): QueryStageExec = { + val optimizedPlan = optimizeQueryStage(e.child) + val queryStage = e match { + case s: ShuffleExchangeExec => + ShuffleQueryStageExec(currentStageId, s.copy(child = optimizedPlan)) + case b: BroadcastExchangeExec => + BroadcastQueryStageExec(currentStageId, b.copy(child = optimizedPlan)) + } + currentStageId += 1 + post(MaterializeStage(queryStage)) + queryStage + } + + override protected def onError(e: Throwable): Unit = callback.onError(e) +} + +case class CreateStageResult(newPlan: SparkPlan, allChildStagesReady: Boolean) + +object QueryStageManager { + private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16)) +} + +trait QueryStageManagerCallback { + def onPlanUpdate(updatedPlan: SparkPlan): Unit + def onFinalPlan(finalPlan: SparkPlan): Unit + def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit + def onError(e: Throwable): Unit +} + +sealed trait QueryStageManagerEvent + +object Start extends QueryStageManagerEvent + +case class MaterializeStage(stage: QueryStageExec) extends QueryStageManagerEvent + +case class StageReady(stage: QueryStageExec) extends QueryStageManagerEvent diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala new file mode 100644 index 0000000000000..f522c12dd663c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive.rule + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.QueryStageExec + +// A sanity check rule to make sure we are running query stage optimizer rules on a sub-tree of +// query plan with all child query stages materialized. +object AssertChildStagesMaterialized extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transform { + case q: QueryStageExec if !q.materialize().isCompleted => + throw new IllegalArgumentException( + s"The input query stages should all be materialized, but the below one is not.\n ${q.plan}") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala new file mode 100644 index 0000000000000..c61380bd10d6e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive.rule + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration.Duration + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ThreadUtils + +/** + * A rule to adjust the post shuffle partitions based on the map output statistics. + * + * The strategy used to determine the number of post-shuffle partitions is described as follows. + * To determine the number of post-shuffle partitions, we have a target input size for a + * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do + * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single + * post-shuffle partition until adding another pre-shuffle partition would cause the size of a + * post-shuffle partition to be greater than the target size. + * + * For example, we have two stages with the following pre-shuffle partition size statistics: + * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB] + * stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB] + * assuming the target input size is 128 MiB, we will have four post-shuffle partitions, + * which are: + * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB) + * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB) + * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB) + * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB) + */ +case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + val shuffleMetrics: Seq[MapOutputStatistics] = plan.collect { + case stage: ShuffleQueryStageExec => + val metricsFuture = stage.mapOutputStatisticsFuture + assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready") + ThreadUtils.awaitResult(metricsFuture, Duration.Zero) + } + + if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { + // If not all leaf nodes are query stages, it's not safe to reduce the number of + // shuffle partitions, because we may break the assumption that all children of a spark plan + // have same number of output partitions. + plan + } else { + // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions, + // we should skip it when calculating the `partitionStartIndices`. + val validMetrics = shuffleMetrics.filter(_ != null) + if (validMetrics.nonEmpty) { + val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) + // This transformation adds new nodes, so we must use `transformUp` here. + plan.transformUp { + // even for shuffle exchange whose input RDD has 0 partition, we should still update its + // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same + // number of output partitions. + case stage: ShuffleQueryStageExec => + CoalescedShuffleReaderExec(stage, partitionStartIndices) + } + } else { + plan + } + } + } + + /** + * Estimates partition start indices for post-shuffle partitions based on + * mapOutputStatistics provided by all pre-shuffle stages. + */ + // visible for testing. + private[sql] def estimatePartitionStartIndices( + mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { + val minNumPostShufflePartitions = conf.minNumPostShufflePartitions + val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize + // If minNumPostShufflePartitions is defined, it is possible that we need to use a + // value less than advisoryTargetPostShuffleInputSize as the target input size of + // a post shuffle task. + val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum + // The max at here is to make sure that when we have an empty table, we + // only have a single post-shuffle partition. + // There is no particular reason that we pick 16. We just need a number to + // prevent maxPostShuffleInputSize from being set to 0. + val maxPostShuffleInputSize = math.max( + math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16) + val targetPostShuffleInputSize = + math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) + + logInfo( + s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " + + s"targetPostShuffleInputSize $targetPostShuffleInputSize.") + + // Make sure we do get the same number of pre-shuffle partitions for those stages. + val distinctNumPreShufflePartitions = + mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct + // The reason that we are expecting a single value of the number of pre-shuffle partitions + // is that when we add Exchanges, we set the number of pre-shuffle partitions + // (i.e. map output partitions) using a static setting, which is the value of + // spark.sql.shuffle.partitions. Even if two input RDDs are having different + // number of partitions, they will have the same number of pre-shuffle partitions + // (i.e. map output partitions). + assert( + distinctNumPreShufflePartitions.length == 1, + "There should be only one distinct value of the number pre-shuffle partitions " + + "among registered Exchange operator.") + val numPreShufflePartitions = distinctNumPreShufflePartitions.head + + val partitionStartIndices = ArrayBuffer[Int]() + // The first element of partitionStartIndices is always 0. + partitionStartIndices += 0 + + var postShuffleInputSize = 0L + + var i = 0 + while (i < numPreShufflePartitions) { + // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages. + // Then, we add the total size to postShuffleInputSize. + var nextShuffleInputSize = 0L + var j = 0 + while (j < mapOutputStatistics.length) { + nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) + j += 1 + } + + // If including the nextShuffleInputSize would exceed the target partition size, then start a + // new partition. + if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { + partitionStartIndices += i + // reset postShuffleInputSize. + postShuffleInputSize = nextShuffleInputSize + } else { + postShuffleInputSize += nextShuffleInputSize + } + + i += 1 + } + + partitionStartIndices.toArray + } +} + +case class CoalescedShuffleReaderExec( + child: ShuffleQueryStageExec, + partitionStartIndices: Array[Int]) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = { + UnknownPartitioning(partitionStartIndices.length) + } + + private var cachedShuffleRDD: ShuffledRowRDD = null + + override protected def doExecute(): RDD[InternalRow] = { + if (cachedShuffleRDD == null) { + cachedShuffleRDD = child.plan.createShuffledRDD(Some(partitionStartIndices)) + } + cachedShuffleRDD + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala new file mode 100644 index 0000000000000..1112869399d5b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive.rule + +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +/** + * Remove shuffle nodes if the child's output partitions is already the desired partitioning. + * + * This should be the last rule of adaptive optimizer rules, as other rules may change plan + * node's output partitioning and make some shuffle nodes become unnecessary. + */ +object RemoveRedundantShuffles extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case shuffle @ ShuffleExchangeExec(upper: HashPartitioning, child) => + child.outputPartitioning match { + case lower: HashPartitioning if upper.semanticEquals(lower) => child + case _ => shuffle + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 703d351bea7c0..18f13cf2eb5ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -66,7 +66,7 @@ case class BroadcastExchangeExec( } @transient - private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = { + lazy val relationFuture: Future[broadcast.Broadcast[Any]] = { // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) Future { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d2d5011bbcb97..126e8e6dd1104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -24,8 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, - SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf /** @@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions - - private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize - - private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled - - private def minNumPostShufflePartitions: Option[Int] = { - val minNumPostShufflePartitions = conf.minNumPostShufflePartitions - if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None - } - - /** - * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled - * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]]. - */ - private def withExchangeCoordinator( - children: Seq[SparkPlan], - requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = { - val supportsCoordinator = - if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) { - // Right now, ExchangeCoordinator only support HashPartitionings. - children.forall { - case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true - case child => - child.outputPartitioning match { - case hash: HashPartitioning => true - case collection: PartitioningCollection => - collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) - case _ => false - } - } - } else { - // In this case, although we do not have Exchange operators, we may still need to - // shuffle data when we have more than one children because data generated by - // these children may not be partitioned in the same way. - // Please see the comment in withCoordinator for more details. - val supportsDistribution = requiredChildDistributions.forall { dist => - dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution] - } - children.length > 1 && supportsDistribution - } - - val withCoordinator = - if (adaptiveExecutionEnabled && supportsCoordinator) { - val coordinator = - new ExchangeCoordinator( - targetPostShuffleInputSize, - minNumPostShufflePartitions) - children.zip(requiredChildDistributions).map { - case (e: ShuffleExchangeExec, _) => - // This child is an Exchange, we need to add the coordinator. - e.copy(coordinator = Some(coordinator)) - case (child, distribution) => - // If this child is not an Exchange, we need to add an Exchange for now. - // Ideally, we can try to avoid this Exchange. However, when we reach here, - // there are at least two children operators (because if there is a single child - // and we can avoid Exchange, supportsCoordinator will be false and we - // will not reach here.). Although we can make two children have the same number of - // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different. - // For example, let's say we have the following plan - // Join - // / \ - // Agg Exchange - // / \ - // Exchange t2 - // / - // t1 - // In this case, because a post-shuffle partition can include multiple pre-shuffle - // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes - // after shuffle. So, even we can use the child Exchange operator of the Join to - // have a number of post-shuffle partitions that matches the number of partitions of - // Agg, we cannot say these two children are partitioned in the same way. - // Here is another case - // Join - // / \ - // Agg1 Agg2 - // / \ - // Exchange1 Exchange2 - // / \ - // t1 t2 - // In this case, two Aggs shuffle data with the same column of the join condition. - // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same - // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2 - // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle - // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its - // pre-shuffle partitions by using another partitionStartIndices [0, 4]. - // So, Agg1 and Agg2 are actually not co-partitioned. - // - // It will be great to introduce a new Partitioning to represent the post-shuffle - // partitions when one post-shuffle partition includes multiple pre-shuffle partitions. - val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions) - assert(targetPartitioning.isInstanceOf[HashPartitioning]) - ShuffleExchangeExec(targetPartitioning, child, Some(coordinator)) - } - } else { - // If we do not need ExchangeCoordinator, the original children are returned. - children - } - - withCoordinator - } + private def defaultNumPreShufflePartitions: Int = + if (conf.adaptiveExecutionEnabled) { + conf.maxNumPostShufflePartitions + } else { + conf.numShufflePartitions + } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution @@ -189,7 +93,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) child match { // If child is an exchange, we replace it with a new one having defaultPartitioning. - case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) + case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c) case _ => ShuffleExchangeExec(defaultPartitioning, child) } } @@ -198,15 +102,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } - // Now, we need to add ExchangeCoordinator if necessary. - // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges. - // However, with the way that we plan the query, we do not have a place where we have a - // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator - // at here for now. - // Once we finish https://issues.apache.org/jira/browse/SPARK-10665, - // we can first add Exchanges and then add coordinator once we have a DAG of query fragments. - children = withExchangeCoordinator(children, requiredChildDistributions) - // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. @@ -295,7 +190,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = plan.transformUp { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. - case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => + case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) => child.outputPartitioning match { case lower: HashPartitioning if upper.semanticEquals(lower) => child case _ => operator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala deleted file mode 100644 index e4ec76f0b9a1f..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.exchange - -import java.util.{HashMap => JHashMap, Map => JMap} -import javax.annotation.concurrent.GuardedBy - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction} -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} - -/** - * A coordinator used to determines how we shuffle data between stages generated by Spark SQL. - * Right now, the work of this coordinator is to determine the number of post-shuffle partitions - * for a stage that needs to fetch shuffle data from one or multiple stages. - * - * A coordinator is constructed with three parameters, `numExchanges`, - * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`. - * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be - * registered to this coordinator. So, when we start to do any actual work, we have a way to - * make sure that we have got expected number of [[ShuffleExchangeExec]]s. - * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's - * input data size. With this parameter, we can estimate the number of post-shuffle partitions. - * This parameter is configured through - * `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`. - * - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator - * will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle - * partitions. - * - * The workflow of this coordinator is described as follows: - * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator, - * if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator. - * This happens in the `doPrepare` method. - * - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this - * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle - * [[ShuffledRowRDD]]. - * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]] - * will immediately get its corresponding post-shuffle [[ShuffledRowRDD]]. - * - If this coordinator has not made the decision on how to shuffle data, it will ask those - * registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the - * size statistics of pre-shuffle partitions, this coordinator will determine the number of - * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices - * to a single post-shuffle partition whenever necessary. - * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered - * [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this - * coordinator can lookup the corresponding [[RDD]]. - * - * The strategy used to determine the number of post-shuffle partitions is described as follows. - * To determine the number of post-shuffle partitions, we have a target input size for a - * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages - * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics - * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until - * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be - * greater than the target size. - * - * For example, we have two stages with the following pre-shuffle partition size statistics: - * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB] - * stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB] - * assuming the target input size is 128 MiB, we will have four post-shuffle partitions, - * which are: - * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB) - * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB) - * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB) - * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB) - */ -class ExchangeCoordinator( - advisoryTargetPostShuffleInputSize: Long, - minNumPostShufflePartitions: Option[Int] = None) - extends Logging { - - // The registered Exchange operators. - private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() - - // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the - // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is - // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails - // in `doEstimationIfNecessary`. - private[this] lazy val numExchanges = exchanges.size - - // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator. - private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = - new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) - - // A boolean that indicates if this coordinator has made decision on how to shuffle data. - // This variable will only be updated by doEstimationIfNecessary, which is protected by - // synchronized. - @volatile private[this] var estimated: Boolean = false - - /** - * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed - * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator. - */ - @GuardedBy("this") - def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized { - exchanges += exchange - } - - def isEstimated: Boolean = estimated - - /** - * Estimates partition start indices for post-shuffle partitions based on - * mapOutputStatistics provided by all pre-shuffle stages. - */ - def estimatePartitionStartIndices( - mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { - // If minNumPostShufflePartitions is defined, it is possible that we need to use a - // value less than advisoryTargetPostShuffleInputSize as the target input size of - // a post shuffle task. - val targetPostShuffleInputSize = minNumPostShufflePartitions match { - case Some(numPartitions) => - val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum - // The max at here is to make sure that when we have an empty table, we - // only have a single post-shuffle partition. - // There is no particular reason that we pick 16. We just need a number to - // prevent maxPostShuffleInputSize from being set to 0. - val maxPostShuffleInputSize = - math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16) - math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) - - case None => advisoryTargetPostShuffleInputSize - } - - logInfo( - s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " + - s"targetPostShuffleInputSize $targetPostShuffleInputSize.") - - // Make sure we do get the same number of pre-shuffle partitions for those stages. - val distinctNumPreShufflePartitions = - mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct - // The reason that we are expecting a single value of the number of pre-shuffle partitions - // is that when we add Exchanges, we set the number of pre-shuffle partitions - // (i.e. map output partitions) using a static setting, which is the value of - // spark.sql.shuffle.partitions. Even if two input RDDs are having different - // number of partitions, they will have the same number of pre-shuffle partitions - // (i.e. map output partitions). - assert( - distinctNumPreShufflePartitions.length == 1, - "There should be only one distinct value of the number pre-shuffle partitions " + - "among registered Exchange operator.") - val numPreShufflePartitions = distinctNumPreShufflePartitions.head - - val partitionStartIndices = ArrayBuffer[Int]() - // The first element of partitionStartIndices is always 0. - partitionStartIndices += 0 - - var postShuffleInputSize = 0L - - var i = 0 - while (i < numPreShufflePartitions) { - // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages. - // Then, we add the total size to postShuffleInputSize. - var nextShuffleInputSize = 0L - var j = 0 - while (j < mapOutputStatistics.length) { - nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) - j += 1 - } - - // If including the nextShuffleInputSize would exceed the target partition size, then start a - // new partition. - if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { - partitionStartIndices += i - // reset postShuffleInputSize. - postShuffleInputSize = nextShuffleInputSize - } else postShuffleInputSize += nextShuffleInputSize - - i += 1 - } - - partitionStartIndices.toArray - } - - @GuardedBy("this") - private def doEstimationIfNecessary(): Unit = synchronized { - // It is unlikely that this method will be called from multiple threads - // (when multiple threads trigger the execution of THIS physical) - // because in common use cases, we will create new physical plan after - // users apply operations (e.g. projection) to an existing DataFrame. - // However, if it happens, we have synchronized to make sure only one - // thread will trigger the job submission. - if (!estimated) { - // Make sure we have the expected number of registered Exchange operators. - assert(exchanges.length == numExchanges) - - val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) - - // Submit all map stages - val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]() - val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]() - var i = 0 - while (i < numExchanges) { - val exchange = exchanges(i) - val shuffleDependency = exchange.prepareShuffleDependency() - shuffleDependencies += shuffleDependency - if (shuffleDependency.rdd.partitions.length != 0) { - // submitMapStage does not accept RDD with 0 partition. - // So, we will not submit this dependency. - submittedStageFutures += - exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency) - } - i += 1 - } - - // Wait for the finishes of those submitted map stages. - val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length) - var j = 0 - while (j < submittedStageFutures.length) { - // This call is a blocking call. If the stage has not finished, we will wait at here. - mapOutputStatistics(j) = submittedStageFutures(j).get() - j += 1 - } - - // If we have mapOutputStatistics.length < numExchange, it is because we do not submit - // a stage when the number of partitions of this dependency is 0. - assert(mapOutputStatistics.length <= numExchanges) - - // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the - // number of post-shuffle partitions. - val partitionStartIndices = - if (mapOutputStatistics.length == 0) { - Array.empty[Int] - } else { - estimatePartitionStartIndices(mapOutputStatistics) - } - - var k = 0 - while (k < numExchanges) { - val exchange = exchanges(k) - val rdd = - exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices)) - newPostShuffleRDDs.put(exchange, rdd) - - k += 1 - } - - // Finally, we set postShuffleRDDs and estimated. - assert(postShuffleRDDs.isEmpty) - assert(newPostShuffleRDDs.size() == numExchanges) - postShuffleRDDs.putAll(newPostShuffleRDDs) - estimated = true - } - } - - def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = { - doEstimationIfNecessary() - - if (!postShuffleRDDs.containsKey(exchange)) { - throw new IllegalStateException( - s"The given $exchange is not registered in this coordinator.") - } - - postShuffleRDDs.get(exchange) - } - - override def toString: String = { - s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index da7b0c6f43fbc..987e73e52950f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -38,12 +38,11 @@ import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} /** - * Performs a shuffle that will result in the desired `newPartitioning`. + * Performs a shuffle that will result in the desired partitioning. */ case class ShuffleExchangeExec( - var newPartitioning: Partitioning, - child: SparkPlan, - @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + desiredPartitioning: Partitioning, + child: SparkPlan) extends Exchange { // NOTE: coordinator can be null after serialization/deserialization, // e.g. it can be null on the Executor side @@ -56,68 +55,34 @@ case class ShuffleExchangeExec( ) ++ readMetrics ++ writeMetrics override def nodeName: String = { - val extraInfo = coordinator match { - case Some(exchangeCoordinator) => - s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case _ => "" - } - - val simpleNodeName = "Exchange" - s"$simpleNodeName$extraInfo" + "Exchange" } - override def outputPartitioning: Partitioning = newPartitioning + override def outputPartitioning: Partitioning = { + desiredPartitioning + } private val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) - override protected def doPrepare(): Unit = { - // If an ExchangeCoordinator is needed, we register this Exchange operator - // to the coordinator when we do prepare. It is important to make sure - // we register this operator right before the execution instead of register it - // in the constructor because it is possible that we create new instances of - // Exchange operators when we transform the physical plan - // (then the ExchangeCoordinator will hold references of unneeded Exchanges). - // So, we should only call registerExchange just before we start to execute - // the plan. - coordinator match { - case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case _ => - } - } + @transient lazy val inputRDD: RDD[InternalRow] = child.execute() /** - * Returns a [[ShuffleDependency]] that will partition rows of its child based on - * the partitioning scheme defined in `newPartitioning`. Those partitions of - * the returned ShuffleDependency will be the input of shuffle. + * A [[ShuffleDependency]] that will partition rows of its child based on the desired + * partitioning/ Those partitions of the returned ShuffleDependency will be the input of shuffle. */ - private[exchange] def prepareShuffleDependency() - : ShuffleDependency[Int, InternalRow, InternalRow] = { + @transient + lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] = { ShuffleExchangeExec.prepareShuffleDependency( - child.execute(), + inputRDD, child.output, - newPartitioning, + outputPartitioning, serializer, writeMetrics) } - /** - * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset. - * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional - * partition start indices array. If this optional array is defined, the returned - * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array. - */ - private[exchange] def preparePostShuffleRDD( - shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], - specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { - // If an array of partition start indices is provided, we need to use this array - // to create the ShuffledRowRDD. Also, we need to update newPartitioning to - // update the number of post-shuffle partitions. - specifiedPartitionStartIndices.foreach { indices => - assert(newPartitioning.isInstanceOf[HashPartitioning]) - newPartitioning = UnknownPartitioning(indices.length) - } - new ShuffledRowRDD(shuffleDependency, readMetrics, specifiedPartitionStartIndices) + def createShuffledRDD(partitionStartIndices: Option[Array[Int]]): ShuffledRowRDD = { + new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) } /** @@ -128,25 +93,13 @@ case class ShuffleExchangeExec( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { - cachedShuffleRDD = coordinator match { - case Some(exchangeCoordinator) => - val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) - assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) - shuffleRDD - case _ => - val shuffleDependency = prepareShuffleDependency() - preparePostShuffleRDD(shuffleDependency) - } + cachedShuffleRDD = createShuffledRDD(None) } cachedShuffleRDD } } object ShuffleExchangeExec { - def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = { - ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) - } - /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 45954f21c5925..a656a2f53e0a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -227,26 +227,26 @@ class SQLAppStatusListener( } } + private def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) = event - def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { - nodes.map { - case cluster: SparkPlanGraphCluster => - val storedCluster = new SparkPlanGraphClusterWrapper( - cluster.id, - cluster.name, - cluster.desc, - toStoredNodes(cluster.nodes), - cluster.metrics) - new SparkPlanGraphNodeWrapper(null, storedCluster) - - case node => - new SparkPlanGraphNodeWrapper(node, null) - } - } - val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => node.metrics.map { metric => (metric.accumulatorId, metric) } @@ -267,6 +267,27 @@ class SQLAppStatusListener( update(exec) } + private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { + val SparkListenerSQLAdaptiveExecutionUpdate(executionId, + physicalPlanDescription, sparkPlanInfo) = event + + val planGraph = SparkPlanGraph(sparkPlanInfo) + val sqlPlanMetrics = planGraph.allNodes.flatMap { node => + node.metrics.map { metric => (metric.accumulatorId, metric) } + }.toMap.values.toList + + val graphToStore = new SparkPlanGraphWrapper( + executionId, + toStoredNodes(planGraph.nodes), + planGraph.edges) + kvstore.write(graphToStore) + + val exec = getOrCreateExecution(executionId) + exec.physicalPlanDescription = physicalPlanDescription + exec.metrics = sqlPlanMetrics + update(exec) + } + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { val SparkListenerSQLExecutionEnd(executionId, time) = event Option(liveExecutions.get(executionId)).foreach { exec => @@ -295,6 +316,7 @@ class SQLAppStatusListener( override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) case _ => // Ignore diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 03d75c4c1b82f..eb1e44570ea89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -37,6 +37,13 @@ case class SparkListenerSQLExecutionStart( time: Long) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSQLAdaptiveExecutionUpdate( + executionId: Long, + physicalPlanDescription: String, + sparkPlanInfo: SparkPlanInfo) + extends SparkListenerEvent + @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index e57d080dadf78..073225ffd2c0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -96,6 +96,15 @@ object SparkPlanGraph { case "InputAdapter" => buildSparkPlanGraphNode( planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) + case "BroadcastQueryStage" | "ShuffleQueryStage" => + if (exchanges.contains(planInfo.children.head)) { + // Point to the re-used exchange + val node = exchanges(planInfo.children.head) + edges += SparkPlanGraphEdge(node.id, parent.id) + } else { + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) + } case "Subquery" if subgraph != null => // Subquery should not be included in WholeStageCodegen buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c90b15814a534..46d83061b3b8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1280,7 +1280,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val agg = cp.groupBy('id % 2).agg(count('id)) agg.queryExecution.executedPlan.collectFirst { - case ShuffleExchangeExec(_, _: RDDScanExec, _) => + case ShuffleExchangeExec(_, _: RDDScanExec) => case BroadcastExchangeExec(_, _: RDDScanExec) => }.foreach { _ => fail( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 142ab6170a734..e6aa066d8f9db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -411,8 +411,7 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning), - None) + DummySparkPlan(outputPartitioning = partitioning)) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) { @@ -427,8 +426,7 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning), - None) + DummySparkPlan(outputPartitioning = partitioning)) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) { @@ -450,7 +448,7 @@ class PlannerSuite extends SharedSQLContext { val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) val shuffle = outputPlan.collect { case e: ShuffleExchangeExec => e } assert(shuffle.size === 1) - assert(shuffle.head.newPartitioning === finalPartitioning) + assert(shuffle.head.outputPartitioning === finalPartitioning) } test("Reuse exchanges") { @@ -462,8 +460,7 @@ class PlannerSuite extends SharedSQLContext { DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), - requiredChildOrdering = Seq(Seq.empty)), - None) + requiredChildOrdering = Seq(Seq.empty))) val inputPlan = SortMergeJoinExec( Literal(1) :: Nil, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala similarity index 66% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 74f33f6c81391..dd4b5591ad8e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -22,11 +22,12 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite} import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ -import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.adaptive.rule.{CoalescedShuffleReaderExec, ReduceNumShufflePartitions} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { +class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll { private var originalActiveSparkSession: Option[SparkSession] = _ private var originalInstantiatedSparkSession: Option[SparkSession] = _ @@ -51,7 +52,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } private def checkEstimation( - coordinator: ExchangeCoordinator, + rule: ReduceNumShufflePartitions, bytesByPartitionIdArray: Array[Array[Long]], expectedPartitionStartIndices: Array[Int]): Unit = { val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map { @@ -59,18 +60,27 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { new MapOutputStatistics(index, bytesByPartitionId) } val estimatedPartitionStartIndices = - coordinator.estimatePartitionStartIndices(mapOutputStatistics) + rule.estimatePartitionStartIndices(mapOutputStatistics) assert(estimatedPartitionStartIndices === expectedPartitionStartIndices) } + private def createReduceNumShufflePartitionsRule( + advisoryTargetPostShuffleInputSize: Long, + minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = { + val conf = new SQLConf().copy( + SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize, + SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions) + ReduceNumShufflePartitions(conf) + } + test("test estimatePartitionStartIndices - 1 Exchange") { - val coordinator = new ExchangeCoordinator(100L) + val rule = createReduceNumShufflePartitionsRule(100L) { // All bytes per partition are 0. val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } { @@ -78,40 +88,40 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // 1 post-shuffle partition is needed. val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0) val expectedPartitionStartIndices = Array[Int](0) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } { // 2 post-shuffle partitions are needed. val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0) val expectedPartitionStartIndices = Array[Int](0, 3) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } { // There are a few large pre-shuffle partitions. val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } { // All pre-shuffle partitions are larger than the targeted size. val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } { // The last pre-shuffle partition is in a single post-shuffle partition. val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110) val expectedPartitionStartIndices = Array[Int](0, 4) - checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) } } test("test estimatePartitionStartIndices - 2 Exchanges") { - val coordinator = new ExchangeCoordinator(100L) + val rule = createReduceNumShufflePartitionsRule(100L) { // If there are multiple values of the number of pre-shuffle partitions, @@ -122,7 +132,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { Array( new MapOutputStatistics(0, bytesByPartitionId1), new MapOutputStatistics(1, bytesByPartitionId2)) - intercept[AssertionError](coordinator.estimatePartitionStartIndices(mapOutputStatistics)) + intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics)) } { @@ -131,7 +141,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -143,7 +153,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -154,7 +164,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 2, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -165,7 +175,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -176,7 +186,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -187,7 +197,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -198,14 +208,14 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } } test("test estimatePartitionStartIndices and enforce minimal number of reducers") { - val coordinator = new ExchangeCoordinator(100L, Some(2)) + val rule = createReduceNumShufflePartitionsRule(100L, 2) { // The minimal number of post-shuffle partitions is not enforced because @@ -214,7 +224,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -225,7 +235,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5) val expectedPartitionStartIndices = Array[Int](0, 3) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -236,7 +246,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4) checkEstimation( - coordinator, + rule, Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionStartIndices) } @@ -257,24 +267,24 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { def withSparkSession( f: SparkSession => Unit, - targetNumPostShufflePartitions: Int, + targetPostShuffleInputSize: Int, minNumPostShufflePartitions: Option[Int]): Unit = { val sparkConf = new SparkConf(false) .setMaster("local[*]") .setAppName("test") .set(UI_ENABLED, false) - .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") + .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, - targetNumPostShufflePartitions.toString) + targetPostShuffleInputSize.toString) minNumPostShufflePartitions match { case Some(numPartitions) => sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString) case None => - sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1") + sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1") } val spark = SparkSession.builder() @@ -304,25 +314,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = agg.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val finalPlan = agg.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader } - assert(exchanges.length === 1) + assert(shuffleReaders.length === 1) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) } case None => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 3) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === 3) } } } @@ -355,25 +361,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader } - assert(exchanges.length === 2) + assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) } case None => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 2) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === 2) } } } @@ -411,26 +413,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader } - assert(exchanges.length === 4) + assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) } case None => - assert(exchanges.forall(_.coordinator.isDefined)) - assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3)) + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === 2) + } } } - withSparkSession(test, 6644, minNumPostShufflePartitions) + withSparkSession(test, 16384, minNumPostShufflePartitions) } test(s"determining the number of reducers: complex query 2$testNameNote") { @@ -463,26 +465,60 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader } - assert(exchanges.length === 3) + assert(shuffleReaders.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) } case None => - assert(exchanges.forall(_.coordinator.isDefined)) - assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3)) + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === 3) + } } } - withSparkSession(test, 6144, minNumPostShufflePartitions) + withSparkSession(test, 12000, minNumPostShufflePartitions) + } + + test(s"determining the number of reducers: plan already partitioned$testNameNote") { + val test: SparkSession => Unit = { spark: SparkSession => + try { + spark.range(1000).write.bucketBy(30, "id").saveAsTable("t") + // `df1` is hash partitioned by `id`. + val df1 = spark.read.table("t") + val df2 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key2", "id as value2") + + val join = df1.join(df2, col("id") === col("key2")).select(col("id"), col("value2")) + + // Check the answer first. + val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id as value") + .union(spark.range(500, 1000).selectExpr("id % 500", "id as value")) + checkAnswer( + join, + expectedAnswer.collect()) + + // Then, let's make sure we do not reduce number of ppst shuffle partitions. + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader + } + assert(shuffleReaders.length === 0) + } finally { + spark.sql("drop table t") + } + } + withSparkSession(test, 12000, minNumPostShufflePartitions) } } @@ -490,11 +526,50 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { val test = { spark: SparkSession => spark.sql("SET spark.sql.exchange.reuse=true") val df = spark.range(1).selectExpr("id AS key", "id AS value") + + // test case 1: a query stage has 3 child stages but they are the same stage. + // ResultQueryStage 1 + // ShuffleQueryStage 0 + // ReusedQueryStage 0 + // ReusedQueryStage 0 val resultDf = df.join(df, "key").join(df, "key") - val sparkPlan = resultDf.queryExecution.executedPlan - assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1) - assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3) + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2) + assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3) checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) + + // test case 2: a query stage has 2 parent stages. + // ResultQueryStage 3 + // ShuffleQueryStage 1 + // ShuffleQueryStage 0 + // ShuffleQueryStage 2 + // ReusedQueryStage 0 + val grouped = df.groupBy("key").agg(max("value").as("value")) + val resultDf2 = grouped.groupBy(col("key") + 1).max("value") + .union(grouped.groupBy(col("key") + 2).max("value")) + + val finalPlan2 = resultDf2.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + + // The result stage has 2 children + val level1Stages = finalPlan2.collect { case q: QueryStageExec => q } + assert(level1Stages.length == 2) + + val leafStages = level1Stages.flatMap { stage => + // All of the child stages of result stage have only one child stage. + val children = stage.plan.collect { case q: QueryStageExec => q } + assert(children.length == 1) + children + } + assert(leafStages.length == 2) + + val reusedStages = level1Stages.flatMap { stage => + stage.plan.collect { case r: ReusedQueryStageExec => r } + } + assert(reusedStages.length == 1) + + checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil) } withSparkSession(test, 4, None) }