diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala index f8a6f1d0d8cbb..c02e48c9815fa 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala @@ -25,3 +25,4 @@ package org.apache.spark * (may be inexact due to use of compressed map statuses) */ private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) + extends Serializable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8fc2b3236ff20..decef69ff70db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -280,14 +280,19 @@ object SQLConf { val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.minNumPostShufflePartitions") - .internal() - .doc("The advisory minimal number of post-shuffle partitions provided to " + - "ExchangeCoordinator. This setting is used in our test to make sure we " + - "have enough parallelism to expose issues that will not be exposed with a " + - "single partition. When the value is a non-positive value, this setting will " + - "not be provided to ExchangeCoordinator.") + .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") + .intConf + .checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " + + "must be a positive integer.") + .createWithDefault(1) + + val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = + buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") + .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.") .intConf - .createWithDefault(-1) + .checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " + + "must be a positive integer.") + .createWithDefault(500) val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") @@ -1698,8 +1703,9 @@ class SQLConf extends Serializable with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) - def minNumPostShufflePartitions: Int = - getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + + def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS) def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 64f49e2d0d4e6..c957285b2a315 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.adaptive.PlanQueryStage import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} @@ -84,7 +85,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { * row format conversions as needed. */ protected def prepareForExecution(plan: SparkPlan): SparkPlan = { - preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } + if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { + adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)} + } else { + preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)} + } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ @@ -95,6 +100,15 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) + protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq( + PlanSubqueries(sparkSession), + EnsureRequirements(sparkSession.sessionState.conf), + ReuseSubquery(sparkSession.sessionState.conf), + // PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees + // by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will + // only transform node in a sub-tree. + PlanQueryStage(sparkSession.sessionState.conf)) + protected def stringOrError[A](f: => A): String = try f.toString catch { case e: AnalysisException => e.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd16381116..863babdbb080a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.execution.adaptive.QueryStageInput import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -51,6 +52,7 @@ private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil + case i: QueryStageInput => i.childStage :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala new file mode 100644 index 0000000000000..ab2b6e9dfdec1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a + * QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges + * and uses the same QueryStage for all the references. + */ +case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] { + + def apply(plan: SparkPlan): SparkPlan = { + + val newPlan = if (!conf.exchangeReuseEnabled) { + plan.transformUp { + case e: ShuffleExchangeExec => + ShuffleQueryStageInput(ShuffleQueryStage(e), e.output) + case e: BroadcastExchangeExec => + BroadcastQueryStageInput(BroadcastQueryStage(e), e.output) + } + } else { + // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. + val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]() + + plan.transformUp { + case exchange: Exchange => + val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]()) + val samePlan = sameSchema.find { s => + exchange.sameResult(s.child) + } + if (samePlan.isDefined) { + // Keep the output of this exchange, the following plans require that to resolve + // attributes. + exchange match { + case e: ShuffleExchangeExec => ShuffleQueryStageInput( + samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output) + case e: BroadcastExchangeExec => BroadcastQueryStageInput( + samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output) + } + } else { + val queryStageInput = exchange match { + case e: ShuffleExchangeExec => + ShuffleQueryStageInput(ShuffleQueryStage(e), e.output) + case e: BroadcastExchangeExec => + BroadcastQueryStageInput(BroadcastQueryStage(e), e.output) + } + sameSchema += queryStageInput.childStage + queryStageInput + } + } + } + ResultQueryStage(newPlan) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala new file mode 100644 index 0000000000000..20f13c280c12c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import java.util.Properties + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +import org.apache.spark.{broadcast, MapOutputStatistics, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.util.ThreadUtils + +/** + * In adaptive execution mode, an execution plan is divided into multiple QueryStages. Each + * QueryStage is a sub-tree that runs in a single stage. + */ +abstract class QueryStage extends UnaryExecNode { + + var child: SparkPlan + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + def withLocalProperties[T](sc: SparkContext, properties: Properties)(body: => T): T = { + val oldProperties = sc.getLocalProperties + try { + sc.setLocalProperties(properties) + body + } finally { + sc.setLocalProperties(oldProperties) + } + } + + /** + * Execute childStages and wait until all stages are completed. Use a thread pool to avoid + * blocking on one child stage. + */ + def executeChildStages(): Unit = { + val localProperties = sqlContext.sparkContext.getLocalProperties + + // Handle broadcast stages + val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect { + case bqs: BroadcastQueryStageInput => bqs.childStage + } + val broadcastFutures = broadcastQueryStages.map { queryStage => + Future { + withLocalProperties(sqlContext.sparkContext, localProperties) { + queryStage.prepareBroadcast() + } + }(QueryStage.executionContext) + } + + // Submit shuffle stages + val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect { + case sqs: ShuffleQueryStageInput => sqs.childStage + } + val shuffleStageFutures = shuffleQueryStages.map { queryStage => + Future { + withLocalProperties(sqlContext.sparkContext, localProperties) { + queryStage.execute() + } + }(QueryStage.executionContext) + } + + ThreadUtils.awaitResult( + Future.sequence(broadcastFutures)(implicitly, QueryStage.executionContext), Duration.Inf) + ThreadUtils.awaitResult( + Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf) + } + + /** + * Before executing the plan in this query stage, we execute all child stages, optimize the plan + * in this stage and determine the reducer number based on the child stages' statistics. Finally + * we do a codegen for this query stage and update the UI with the new plan. + */ + def prepareExecuteStage(): Unit = { + // 1. Execute childStages + executeChildStages() + // It is possible to optimize this stage's plan here based on the child stages' statistics. + + // 2. Determine reducer number + val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect { + case input: ShuffleQueryStageInput => input + } + val childMapOutputStatistics = queryStageInputs.map(_.childStage.mapOutputStatistics) + .filter(_ != null).toArray + if (childMapOutputStatistics.length > 0) { + val exchangeCoordinator = new ExchangeCoordinator( + conf.targetPostShuffleInputSize, + conf.minNumPostShufflePartitions) + + val partitionStartIndices = + exchangeCoordinator.estimatePartitionStartIndices(childMapOutputStatistics) + child = child.transform { + case ShuffleQueryStageInput(childStage, output, _) => + ShuffleQueryStageInput(childStage, output, Some(partitionStartIndices)) + } + } + + // 3. Codegen and update the UI + child = CollapseCodegenStages(sqlContext.conf).apply(child) + val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionId != null && executionId.nonEmpty) { + val queryExecution = SQLExecution.getQueryExecution(executionId.toLong) + sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( + executionId.toLong, + queryExecution.toString, + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan))) + } + } + + // Caches the created ShuffleRowRDD so we can reuse that. + private var cachedRDD: RDD[InternalRow] = null + + def executeStage(): RDD[InternalRow] = child.execute() + + /** + * A QueryStage can be reused like Exchange. It is possible that multiple threads try to submit + * the same QueryStage. Use synchronized to make sure it is executed only once. + */ + override def doExecute(): RDD[InternalRow] = synchronized { + if (cachedRDD == null) { + prepareExecuteStage() + cachedRDD = executeStage() + } + cachedRDD + } + + override def executeCollect(): Array[InternalRow] = { + prepareExecuteStage() + child.executeCollect() + } + + override def executeToIterator(): Iterator[InternalRow] = { + prepareExecuteStage() + child.executeToIterator() + } + + override def executeTake(n: Int): Array[InternalRow] = { + prepareExecuteStage() + child.executeTake(n) + } + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + builder: StringBuilder, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { + child.generateTreeString(depth, lastChildren, builder, verbose, "*") + } +} + +/** + * The last QueryStage of an execution plan. + */ +case class ResultQueryStage(var child: SparkPlan) extends QueryStage + +/** + * A shuffle QueryStage whose child is a ShuffleExchange. + */ +case class ShuffleQueryStage(var child: SparkPlan) extends QueryStage { + + protected var _mapOutputStatistics: MapOutputStatistics = null + + def mapOutputStatistics: MapOutputStatistics = _mapOutputStatistics + + override def executeStage(): RDD[InternalRow] = { + child match { + case e: ShuffleExchangeExec => + val result = e.eagerExecute() + _mapOutputStatistics = e.mapOutputStatistics + result + case _ => throw new IllegalArgumentException( + "The child of ShuffleQueryStage must be a ShuffleExchange.") + } + } +} + +/** + * A broadcast QueryStage whose child is a BroadcastExchangeExec. + */ +case class BroadcastQueryStage(var child: SparkPlan) extends QueryStage { + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + child.executeBroadcast() + } + + private var prepared = false + + def prepareBroadcast() : Unit = synchronized { + if (!prepared) { + executeChildStages() + child = CollapseCodegenStages(sqlContext.conf).apply(child) + // After child stages are completed, prepare() triggers the broadcast. + prepare() + prepared = true + } + } + + override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "BroadcastExchange does not support the execute() code path.") + } +} + +object QueryStage { + private[execution] val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("adaptive-query-stage")) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala new file mode 100644 index 0000000000000..887f815b6117a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution._ + +/** + * QueryStageInput is the leaf node of a QueryStage and is used to hide its child stage. It gets + * the result of its child stage and serves it as the input of the QueryStage. A QueryStage knows + * its child stages by collecting all the QueryStageInputs. + */ +abstract class QueryStageInput extends LeafExecNode { + + def childStage: QueryStage + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = childStage.canonicalized + + // Similar to ReusedExchangeExec, two QueryStageInputs can reference to the same childStage. + // QueryStageInput can have distinct set of output attribute ids from its childStage, we need + // to update the attribute ids in outputPartitioning and outputOrdering. + private lazy val updateAttr: Expression => Expression = { + val originalAttrToNewAttr = AttributeMap(childStage.output.zip(output)) + e => e.transform { + case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr) + } + } + + override def outputPartitioning: Partitioning = childStage.outputPartitioning match { + case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) + case other => other + } + + override def outputOrdering: Seq[SortOrder] = { + childStage.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder]) + } + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + builder: StringBuilder, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { + childStage.generateTreeString(depth, lastChildren, builder, verbose, "*") + } +} + +/** + * A QueryStageInput whose child stage is a ShuffleQueryStage. It returns a new ShuffledRowRDD + * based on the the child stage's result RDD and the specified partitionStartIndices. If the + * child stage is reused by another ShuffleQueryStageInput, they can return RDDs with different + * partitionStartIndices. + */ +case class ShuffleQueryStageInput( + childStage: ShuffleQueryStage, + override val output: Seq[Attribute], + partitionStartIndices: Option[Array[Int]] = None) + extends QueryStageInput { + + override def outputPartitioning: Partitioning = partitionStartIndices.map { + indices => UnknownPartitioning(indices.length) + }.getOrElse(super.outputPartitioning) + + override def doExecute(): RDD[InternalRow] = { + val childRDD = childStage.execute().asInstanceOf[ShuffledRowRDD] + new ShuffledRowRDD(childRDD.dependency, partitionStartIndices) + } +} + +/** A QueryStageInput whose child stage is a BroadcastQueryStage. */ +case class BroadcastQueryStageInput( + childStage: BroadcastQueryStage, + override val output: Seq[Attribute]) + extends QueryStageInput { + + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + childStage.executeBroadcast() + } + + override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "BroadcastStageInput does not support the execute() code path.") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d2d5011bbcb97..8184baf50b042 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -36,107 +36,12 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions - - private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize - - private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled - - private def minNumPostShufflePartitions: Option[Int] = { - val minNumPostShufflePartitions = conf.minNumPostShufflePartitions - if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None - } - - /** - * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled - * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]]. - */ - private def withExchangeCoordinator( - children: Seq[SparkPlan], - requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = { - val supportsCoordinator = - if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) { - // Right now, ExchangeCoordinator only support HashPartitionings. - children.forall { - case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true - case child => - child.outputPartitioning match { - case hash: HashPartitioning => true - case collection: PartitioningCollection => - collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) - case _ => false - } - } - } else { - // In this case, although we do not have Exchange operators, we may still need to - // shuffle data when we have more than one children because data generated by - // these children may not be partitioned in the same way. - // Please see the comment in withCoordinator for more details. - val supportsDistribution = requiredChildDistributions.forall { dist => - dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution] - } - children.length > 1 && supportsDistribution - } - - val withCoordinator = - if (adaptiveExecutionEnabled && supportsCoordinator) { - val coordinator = - new ExchangeCoordinator( - targetPostShuffleInputSize, - minNumPostShufflePartitions) - children.zip(requiredChildDistributions).map { - case (e: ShuffleExchangeExec, _) => - // This child is an Exchange, we need to add the coordinator. - e.copy(coordinator = Some(coordinator)) - case (child, distribution) => - // If this child is not an Exchange, we need to add an Exchange for now. - // Ideally, we can try to avoid this Exchange. However, when we reach here, - // there are at least two children operators (because if there is a single child - // and we can avoid Exchange, supportsCoordinator will be false and we - // will not reach here.). Although we can make two children have the same number of - // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different. - // For example, let's say we have the following plan - // Join - // / \ - // Agg Exchange - // / \ - // Exchange t2 - // / - // t1 - // In this case, because a post-shuffle partition can include multiple pre-shuffle - // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes - // after shuffle. So, even we can use the child Exchange operator of the Join to - // have a number of post-shuffle partitions that matches the number of partitions of - // Agg, we cannot say these two children are partitioned in the same way. - // Here is another case - // Join - // / \ - // Agg1 Agg2 - // / \ - // Exchange1 Exchange2 - // / \ - // t1 t2 - // In this case, two Aggs shuffle data with the same column of the join condition. - // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same - // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2 - // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle - // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its - // pre-shuffle partitions by using another partitionStartIndices [0, 4]. - // So, Agg1 and Agg2 are actually not co-partitioned. - // - // It will be great to introduce a new Partitioning to represent the post-shuffle - // partitions when one post-shuffle partition includes multiple pre-shuffle partitions. - val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions) - assert(targetPartitioning.isInstanceOf[HashPartitioning]) - ShuffleExchangeExec(targetPartitioning, child, Some(coordinator)) - } - } else { - // If we do not need ExchangeCoordinator, the original children are returned. - children - } - - withCoordinator - } + private def defaultNumPreShufflePartitions: Int = + if (conf.adaptiveExecutionEnabled) { + conf.maxNumPostShufflePartitions + } else { + conf.numShufflePartitions + } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution @@ -189,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) child match { // If child is an exchange, we replace it with a new one having defaultPartitioning. - case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) + case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c) case _ => ShuffleExchangeExec(defaultPartitioning, child) } } @@ -198,15 +103,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } - // Now, we need to add ExchangeCoordinator if necessary. - // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges. - // However, with the way that we plan the query, we do not have a place where we have a - // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator - // at here for now. - // Once we finish https://issues.apache.org/jira/browse/SPARK-10665, - // we can first add Exchanges and then add coordinator once we have a DAG of query fragments. - children = withExchangeCoordinator(children, requiredChildDistributions) - // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. @@ -295,7 +191,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = plan.transformUp { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. - case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => + case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) => child.outputPartitioning match { case lower: HashPartitioning if upper.semanticEquals(lower) => child case _ => operator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index f5d93ee5fa914..6fccc7a89ed1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -17,60 +17,31 @@ package org.apache.spark.sql.execution.exchange -import java.util.{HashMap => JHashMap, Map => JMap} -import javax.annotation.concurrent.GuardedBy - import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction} +import org.apache.spark.MapOutputStatistics import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} /** * A coordinator used to determines how we shuffle data between stages generated by Spark SQL. * Right now, the work of this coordinator is to determine the number of post-shuffle partitions * for a stage that needs to fetch shuffle data from one or multiple stages. * - * A coordinator is constructed with three parameters, `numExchanges`, - * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`. - * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be - * registered to this coordinator. So, when we start to do any actual work, we have a way to - * make sure that we have got expected number of [[ShuffleExchangeExec]]s. + * A coordinator is constructed with two parameters, `targetPostShuffleInputSize`, + * and `minNumPostShufflePartitions`. * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's * input data size. With this parameter, we can estimate the number of post-shuffle partitions. * This parameter is configured through * `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`. - * - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator - * will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle - * partitions. - * - * The workflow of this coordinator is described as follows: - * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator, - * if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator. - * This happens in the `doPrepare` method. - * - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this - * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle - * [[ShuffledRowRDD]]. - * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]] - * will immediately get its corresponding post-shuffle [[ShuffledRowRDD]]. - * - If this coordinator has not made the decision on how to shuffle data, it will ask those - * registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the - * size statistics of pre-shuffle partitions, this coordinator will determine the number of - * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices - * to a single post-shuffle partition whenever necessary. - * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered - * [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this - * coordinator can lookup the corresponding [[RDD]]. + * - `minNumPostShufflePartitions` is used to make sure that there are at least + * `minNumPostShufflePartitions` post-shuffle partitions. * * The strategy used to determine the number of post-shuffle partitions is described as follows. * To determine the number of post-shuffle partitions, we have a target input size for a - * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages - * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics - * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until - * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be - * greater than the target size. + * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do + * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single + * post-shuffle partition until adding another pre-shuffle partition would cause the size of a + * post-shuffle partition to be greater than the target size. * * For example, we have two stages with the following pre-shuffle partition size statistics: * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB] @@ -84,60 +55,28 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} */ class ExchangeCoordinator( advisoryTargetPostShuffleInputSize: Long, - minNumPostShufflePartitions: Option[Int] = None) + minNumPostShufflePartitions: Int = 1) extends Logging { - // The registered Exchange operators. - private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() - - // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the - // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is - // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails - // in `doEstimationIfNecessary`. - private[this] lazy val numExchanges = exchanges.size - - // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator. - private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = - new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) - - // A boolean that indicates if this coordinator has made decision on how to shuffle data. - // This variable will only be updated by doEstimationIfNecessary, which is protected by - // synchronized. - @volatile private[this] var estimated: Boolean = false - - /** - * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed - * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator. - */ - @GuardedBy("this") - def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized { - exchanges += exchange - } - - def isEstimated: Boolean = estimated - /** * Estimates partition start indices for post-shuffle partitions based on * mapOutputStatistics provided by all pre-shuffle stages. */ def estimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { + // If minNumPostShufflePartitions is defined, it is possible that we need to use a // value less than advisoryTargetPostShuffleInputSize as the target input size of // a post shuffle task. - val targetPostShuffleInputSize = minNumPostShufflePartitions match { - case Some(numPartitions) => - val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum - // The max at here is to make sure that when we have an empty table, we - // only have a single post-shuffle partition. - // There is no particular reason that we pick 16. We just need a number to - // prevent maxPostShuffleInputSize from being set to 0. - val maxPostShuffleInputSize = - math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16) - math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) - - case None => advisoryTargetPostShuffleInputSize - } + val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum + // The max at here is to make sure that when we have an empty table, we + // only have a single post-shuffle partition. + // There is no particular reason that we pick 16. We just need a number to + // prevent maxPostShuffleInputSize from being set to 0. + val maxPostShuffleInputSize = math.max( + math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16) + val targetPostShuffleInputSize = + math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) logInfo( s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " + @@ -189,88 +128,6 @@ class ExchangeCoordinator( partitionStartIndices.toArray } - @GuardedBy("this") - private def doEstimationIfNecessary(): Unit = synchronized { - // It is unlikely that this method will be called from multiple threads - // (when multiple threads trigger the execution of THIS physical) - // because in common use cases, we will create new physical plan after - // users apply operations (e.g. projection) to an existing DataFrame. - // However, if it happens, we have synchronized to make sure only one - // thread will trigger the job submission. - if (!estimated) { - // Make sure we have the expected number of registered Exchange operators. - assert(exchanges.length == numExchanges) - - val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) - - // Submit all map stages - val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]() - val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]() - var i = 0 - while (i < numExchanges) { - val exchange = exchanges(i) - val shuffleDependency = exchange.prepareShuffleDependency() - shuffleDependencies += shuffleDependency - if (shuffleDependency.rdd.partitions.length != 0) { - // submitMapStage does not accept RDD with 0 partition. - // So, we will not submit this dependency. - submittedStageFutures += - exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency) - } - i += 1 - } - - // Wait for the finishes of those submitted map stages. - val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length) - var j = 0 - while (j < submittedStageFutures.length) { - // This call is a blocking call. If the stage has not finished, we will wait at here. - mapOutputStatistics(j) = submittedStageFutures(j).get() - j += 1 - } - - // If we have mapOutputStatistics.length < numExchange, it is because we do not submit - // a stage when the number of partitions of this dependency is 0. - assert(mapOutputStatistics.length <= numExchanges) - - // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the - // number of post-shuffle partitions. - val partitionStartIndices = - if (mapOutputStatistics.length == 0) { - Array.empty[Int] - } else { - estimatePartitionStartIndices(mapOutputStatistics) - } - - var k = 0 - while (k < numExchanges) { - val exchange = exchanges(k) - val rdd = - exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices)) - newPostShuffleRDDs.put(exchange, rdd) - - k += 1 - } - - // Finally, we set postShuffleRDDs and estimated. - assert(postShuffleRDDs.isEmpty) - assert(newPostShuffleRDDs.size() == numExchanges) - postShuffleRDDs.putAll(newPostShuffleRDDs) - estimated = true - } - } - - def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = { - doEstimationIfNecessary() - - if (!postShuffleRDDs.containsKey(exchange)) { - throw new IllegalStateException( - s"The given $exchange is not registered in this coordinator.") - } - - postShuffleRDDs.get(exchange) - } - override def toString: String = { s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index aba94885f941c..bbd1a3f005d74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -41,8 +41,7 @@ import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordCo */ case class ShuffleExchangeExec( var newPartitioning: Partitioning, - child: SparkPlan, - @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + child: SparkPlan) extends Exchange { // NOTE: coordinator can be null after serialization/deserialization, // e.g. it can be null on the Executor side @@ -51,14 +50,7 @@ case class ShuffleExchangeExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) override def nodeName: String = { - val extraInfo = coordinator match { - case Some(exchangeCoordinator) => - s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case _ => "" - } - - val simpleNodeName = "Exchange" - s"$simpleNodeName$extraInfo" + "Exchange" } override def outputPartitioning: Partitioning = newPartitioning @@ -66,21 +58,6 @@ case class ShuffleExchangeExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) - override protected def doPrepare(): Unit = { - // If an ExchangeCoordinator is needed, we register this Exchange operator - // to the coordinator when we do prepare. It is important to make sure - // we register this operator right before the execution instead of register it - // in the constructor because it is possible that we create new instances of - // Exchange operators when we transform the physical plan - // (then the ExchangeCoordinator will hold references of unneeded Exchanges). - // So, we should only call registerExchange just before we start to execute - // the plan. - coordinator match { - case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case _ => - } - } - /** * Returns a [[ShuffleDependency]] that will partition rows of its child based on * the partitioning scheme defined in `newPartitioning`. Those partitions of @@ -119,25 +96,32 @@ case class ShuffleExchangeExec( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { - cachedShuffleRDD = coordinator match { - case Some(exchangeCoordinator) => - val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) - assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) - shuffleRDD - case _ => - val shuffleDependency = prepareShuffleDependency() - preparePostShuffleRDD(shuffleDependency) + val shuffleDependency = prepareShuffleDependency() + cachedShuffleRDD = preparePostShuffleRDD(shuffleDependency) + } + cachedShuffleRDD + } + + private var _mapOutputStatistics: MapOutputStatistics = null + + def mapOutputStatistics: MapOutputStatistics = _mapOutputStatistics + + def eagerExecute(): RDD[InternalRow] = { + if (cachedShuffleRDD == null) { + val shuffleDependency = prepareShuffleDependency() + if (shuffleDependency.rdd.partitions.length != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sqlContext.sparkContext.submitMapStage(shuffleDependency) + _mapOutputStatistics = submittedStageFuture.get() } + cachedShuffleRDD = preparePostShuffleRDD(shuffleDependency) } cachedShuffleRDD } } object ShuffleExchangeExec { - def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = { - ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) - } - /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 1199eeca959d5..78832f8af525a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -228,26 +228,26 @@ class SQLAppStatusListener( } } + private def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) = event - def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { - nodes.map { - case cluster: SparkPlanGraphCluster => - val storedCluster = new SparkPlanGraphClusterWrapper( - cluster.id, - cluster.name, - cluster.desc, - toStoredNodes(cluster.nodes), - cluster.metrics) - new SparkPlanGraphNodeWrapper(null, storedCluster) - - case node => - new SparkPlanGraphNodeWrapper(node, null) - } - } - val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => node.metrics.map { metric => (metric.accumulatorId, metric) } @@ -268,6 +268,27 @@ class SQLAppStatusListener( update(exec) } + private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { + val SparkListenerSQLAdaptiveExecutionUpdate(executionId, + physicalPlanDescription, sparkPlanInfo) = event + + val planGraph = SparkPlanGraph(sparkPlanInfo) + val sqlPlanMetrics = planGraph.allNodes.flatMap { node => + node.metrics.map { metric => (metric.accumulatorId, metric) } + }.toMap.values.toList + + val graphToStore = new SparkPlanGraphWrapper( + executionId, + toStoredNodes(planGraph.nodes), + planGraph.edges) + kvstore.write(graphToStore) + + val exec = getOrCreateExecution(executionId) + exec.physicalPlanDescription = physicalPlanDescription + exec.metrics = sqlPlanMetrics + update(exec) + } + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { val SparkListenerSQLExecutionEnd(executionId, time) = event Option(liveExecutions.get(executionId)).foreach { exec => @@ -296,6 +317,7 @@ class SQLAppStatusListener( override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) case _ => // Ignore diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 03d75c4c1b82f..eb1e44570ea89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -37,6 +37,13 @@ case class SparkListenerSQLExecutionStart( time: Long) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSQLAdaptiveExecutionUpdate( + executionId: Long, + physicalPlanDescription: String, + sparkPlanInfo: SparkPlanInfo) + extends SparkListenerEvent + @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index e57d080dadf78..15b4acfb662b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -96,6 +96,18 @@ object SparkPlanGraph { case "InputAdapter" => buildSparkPlanGraphNode( planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) + case "QueryStage" | "BroadcastQueryStage" | "ResultQueryStage" | "ShuffleQueryStage" => + if (exchanges.contains(planInfo.children.head)) { + // Point to the re-used exchange + val node = exchanges(planInfo.children.head) + edges += SparkPlanGraphEdge(node.id, parent.id) + } else { + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) + } + case "QueryStageInput" | "ShuffleQueryStageInput" | "BroadcastQueryStageInput" => + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) case "Subquery" if subgraph != null => // Subquery should not be included in WholeStageCodegen buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 82d3b22a48670..8d1e8ba8b1e1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1279,7 +1279,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val agg = cp.groupBy('id % 2).agg(count('id)) agg.queryExecution.executedPlan.collectFirst { - case ShuffleExchangeExec(_, _: RDDScanExec, _) => + case ShuffleExchangeExec(_, _: RDDScanExec) => case BroadcastExchangeExec(_, _: RDDScanExec) => }.foreach { _ => fail( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index 6ad025f37e440..5abb2c0fb03bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -21,7 +21,8 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite} import org.apache.spark.sql._ -import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageInput +import org.apache.spark.sql.execution.exchange.ExchangeCoordinator import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -204,7 +205,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } test("test estimatePartitionStartIndices and enforce minimal number of reducers") { - val coordinator = new ExchangeCoordinator(100L, Some(2)) + val coordinator = new ExchangeCoordinator(100L, 2) { // The minimal number of post-shuffle partitions is not enforced because @@ -264,7 +265,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { .setAppName("test") .set("spark.ui.enabled", "false") .set("spark.driver.allowMultipleContexts", "true") - .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") + .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( @@ -274,7 +275,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { case Some(numPartitions) => sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString) case None => - sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1") + sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1") } val spark = SparkSession.builder() @@ -304,25 +305,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = agg.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val queryStageInputs = agg.queryExecution.executedPlan.collect { + case q: ShuffleQueryStageInput => q } - assert(exchanges.length === 1) + assert(queryStageInputs.length === 1) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 5) } case None => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 3) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 3) } } } @@ -355,25 +352,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val queryStageInputs = join.queryExecution.executedPlan.collect { + case q: ShuffleQueryStageInput => q } - assert(exchanges.length === 2) + assert(queryStageInputs.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 5) } case None => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 2) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 2) } } } @@ -411,26 +404,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val queryStageInputs = join.queryExecution.executedPlan.collect { + case q: ShuffleQueryStageInput => q } - assert(exchanges.length === 4) + assert(queryStageInputs.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 5) } case None => - assert(exchanges.forall(_.coordinator.isDefined)) - assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3)) + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 2) + } } } - withSparkSession(test, 6644, minNumPostShufflePartitions) + withSparkSession(test, 16384, minNumPostShufflePartitions) } test(s"determining the number of reducers: complex query 2$testNameNote") { @@ -463,39 +456,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchangeExec => e + val queryStageInputs = join.queryExecution.executedPlan.collect { + case q: ShuffleQueryStageInput => q } - assert(exchanges.length === 3) + assert(queryStageInputs.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - exchanges.foreach { - case e: ShuffleExchangeExec => - assert(e.coordinator.isDefined) - assert(e.outputPartitioning.numPartitions === 5) - case o => + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 5) } case None => - assert(exchanges.forall(_.coordinator.isDefined)) - assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3)) + queryStageInputs.foreach { q => + assert(q.partitionStartIndices.isDefined) + assert(q.outputPartitioning.numPartitions === 3) + } } } - withSparkSession(test, 6144, minNumPostShufflePartitions) - } - } - - test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { - val test = { spark: SparkSession => - spark.sql("SET spark.sql.exchange.reuse=true") - val df = spark.range(1).selectExpr("id AS key", "id AS value") - val resultDf = df.join(df, "key").join(df, "key") - val sparkPlan = resultDf.queryExecution.executedPlan - assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1) - assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3) - checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) + withSparkSession(test, 12000, minNumPostShufflePartitions) } - withSparkSession(test, 4, None) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index e4e224df7607f..409474552b1cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -411,8 +411,7 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning), - None) + DummySparkPlan(outputPartitioning = partitioning)) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) { @@ -427,8 +426,7 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning), - None) + DummySparkPlan(outputPartitioning = partitioning)) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) { @@ -462,8 +460,7 @@ class PlannerSuite extends SharedSQLContext { DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), - requiredChildOrdering = Seq(Seq.empty)), - None) + requiredChildOrdering = Seq(Seq.empty))) val inputPlan = SortMergeJoinExec( Literal(1) :: Nil, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala new file mode 100644 index 0000000000000..b02e7691e6ee1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.RangeExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.{BuildRight, ShuffledHashJoinExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class PlanQueryStageTest extends SharedSQLContext { + + test("Replaces ShuffleExchangeExec/BroadcastExchangeExec with reuse disabled") { + val range = org.apache.spark.sql.catalyst.plans.logical.Range(1, 100, 1, 1) + val originalPlan = ShuffleExchangeExec( + HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), + RangeExec(range)) + + val conf = new SQLConf + conf.setConfString("spark.sql.exchange.reuse", "false") + val planQueryStage = PlanQueryStage(conf) + val newPlan = planQueryStage(originalPlan) + + val expectedPlan = ResultQueryStage( + ShuffleQueryStageInput( + ShuffleQueryStage(originalPlan), + range.output)) + + assert(newPlan == expectedPlan) + } + + test("Reuses ShuffleQueryStage when possible") { + val conf = new SQLConf + conf.setConfString("spark.sql.exchange.reuse", "true") + + val planQueryStage = PlanQueryStage(conf) + val newPlan = planQueryStage(createJoinExec(100, 100)) + + val collected = newPlan.collect { + case e: ShuffleQueryStageInput => e.childStage + } + + assert(collected.length == 2) + assert(collected(0).eq(collected(1))) + } + + test("Creates multiple ShuffleQueryStages when stages are different") { + val conf = new SQLConf + conf.setConfString("spark.sql.exchange.reuse", "true") + + val planQueryStage = PlanQueryStage(conf) + val newPlan = planQueryStage(createJoinExec(100, 101)) + + val collected = newPlan.collect { + case e: ShuffleQueryStageInput => e.childStage + } + + assert(collected.length == 2) + assert(!collected(0).eq(collected(1))) + } + + def createJoinExec(leftNum: Int, rightNum: Int): ShuffledHashJoinExec = { + val left = ShuffleExchangeExec( + HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, leftNum, 1, 1))) + + val right = ShuffleExchangeExec( + HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, rightNum, 1, 1))) + + ShuffledHashJoinExec( + Seq(UnresolvedAttribute("blah")), + Seq(UnresolvedAttribute("blah")), + Inner, + BuildRight, + None, + left, + right) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala new file mode 100644 index 0000000000000..ce56ebc4351e3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.RangeExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class QueryStageTest extends SharedSQLContext { + test("Adaptive Query Execution repartitions") { + val originalNumPartitions = 100 + + val plan = { + val leftRangeExec = RangeExec( + org.apache.spark.sql.catalyst.plans.logical.Range(1, 1000, 1, 1)) + + ShuffleExchangeExec( + HashPartitioning(leftRangeExec.output, originalNumPartitions), + leftRangeExec) + } + + assert(plan.execute().getNumPartitions == originalNumPartitions) + assert(PlanQueryStage.apply(new SQLConf)(plan).execute().getNumPartitions == 1) + } +}