Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,20 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"The by default equals to spark.sql.shuffle.partitions")
.intConf
.createWithDefault(-1)
.checkValue(_ > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createOptional

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1728,8 +1734,10 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -99,10 +100,13 @@ class QueryExecution(
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
ReuseSubquery(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
// `AdaptiveSparkPlan` is a leaf node. If inserted, all the following rules will be no-op as
// the original plan is hidden behind `AdaptiveSparkPlan`.
InsertAdaptiveSparkPlan(sparkSession),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
ReuseExchange(sparkSession.sessionState.conf))

def simpleString: String = withRedaction {
val concat = new StringConcat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -52,6 +53,8 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.finalPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import java.util.concurrent.CountDownLatch

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate

/**
* A root node to execute the query plan adaptively. It splits the query plan into independent
* stages and executes them in order according to their dependencies. The query stage
* materializes its output at the end. When one stage completes, the data statistics of its
* materialized output will be used to optimize the subsequent stages.
* This is called mid-query re-optimization in database literature.
*/
case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
extends LeafExecNode{

override def output: Seq[Attribute] = initialPlan.output

@volatile private var currentPlan: SparkPlan = initialPlan
@volatile private var error: Throwable = null

// We will release the lock when all the query stages are completed, or we fail to
// optimize/execute query stages. Getting `finalPlan` will be blocked until the lock is release.
// This is better than wait()/notify(), as we can easily check if the computation has completed,
// by calling `readyLock.getCount()`.
private val readyLock = new CountDownLatch(1)

private def createCallback(executionId: Option[Long]) = new QueryStageManagerCallback {
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
updateCurrentPlan(updatedPlan, executionId)
}

override def onFinalPlan(finalPlan: SparkPlan): Unit = {
updateCurrentPlan(finalPlan, executionId)
readyLock.countDown()
}

override def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit = {
error = new SparkException(
s"""
|Fail to materialize query stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}

override def onError(e: Throwable): Unit = {
error = e
readyLock.countDown()
}
}

private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = {
currentPlan = newPlan
executionId.foreach { id =>
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
SQLExecution.getQueryExecution(id).toString,
SparkPlanInfo.fromSparkPlan(currentPlan)))
}
}

def finalPlan: SparkPlan = {
if (readyLock.getCount > 0) {
val sc = session.sparkContext
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
val stageManager = new QueryStageManager(initialPlan, session, createCallback(executionId))
stageManager.start()
readyLock.await()
stageManager.stop()
}

if (error != null) throw error
currentPlan
}

override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()
override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n)
override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator()
override def doExecute(): RDD[InternalRow] = finalPlan.execute()
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
currentPlan.generateTreeString(
depth, lastChildren, append, verbose, "", false, maxFields)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec

/**
* This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which executes the query plan
* adaptively with runtime data statistics. Note that this rule must be run after
* [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are
* already inserted.
*/
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = plan match {
case _: ExecutedCommandExec => plan
case _ if session.sessionState.conf.adaptiveExecutionEnabled =>
AdaptiveSparkPlanExec(plan, session.cloneSession())
case _ => plan
}
}
Loading