-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL #20303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan , @gatorsmile , @yhuai |
Test build #86305 has finished for PR 20303 at commit
|
} | ||
|
||
// 3. Codegen and update the UI | ||
child = CollapseCodegenStages(sqlContext.conf).apply(child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this line to:
child = child match {
case s: WholeStageCodegenExec => s
case other => CollapseCodegenStages(sqlContext.conf).apply(other)
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
child
seems won't be WholeStageCodegenExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, @gczsjdy is correct.
In adaptive execution, there is no the whole stage codegen in QueryExecution.adaptivePreparations, so child could not be WholeStageCodegenExec.
Test build #86697 has finished for PR 20303 at commit
|
Jenkins, retest this please. |
Test build #86757 has finished for PR 20303 at commit
|
Test build #87236 has finished for PR 20303 at commit
|
@carsonwang what's your plan to merge this fix into master or some release? thanks a lot! |
@aaron-aa , the committers agreed to start reviewing the code after 2.4 release. |
@carsonwang Thanks |
@cloud-fan @gatorsmile , are you ready to start reviewing this? I can bring this update to date. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good. One thing I'm unclear is how whole stage codegen is applied to query stages recursively. Can you explain it a little more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that this rule must be run after EnsureRequirements
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will add it and rebase the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also that this should be applied at last as it actually divide the tree into multiple sub-trees?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is commented in QueryExecution when using this rule. Let me also add it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when mapOutputStatistics
can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the childStage's RDD has 0 partition, we will not submit that stage. See ShuffleExchangeExec.eagerExecute. In that case, mapOutputStatistics will be null so we filter it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's a var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a var so that we can update the plan at run time by directly assigning a new child to ShuffleQueryStage. This won't affect other query stages.
@cloud-fan , we don't apply whole stage codegen to query stages recursively. In |
…es when call executeCollect, executeToIterator and executeTake action multi-times (apache#70) * Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times * only add the check in prepareExecuteStage method to avoid duplicate check in other methods * small fix
Test build #101268 has finished for PR 20303 at commit
|
Test build #101267 has finished for PR 20303 at commit
|
* do not re-implement exchange reuse * simplify QueryStage * add comments * new idea * polish * address comments * improve QueryStageTrigger
Test build #101520 has finished for PR 20303 at commit
|
retest this please |
Test build #102848 has finished for PR 20303 at commit
|
Test build #102855 has finished for PR 20303 at commit
|
|
||
withCoordinator | ||
} | ||
private def defaultNumPreShufflePartitions: Int = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carsonwang With AE being the new mode, spark.sql.adaptive.maxNumPostShufflePartitions will replace spark.sql.shuffle.partitions to determine initial shuffle parallelism, this seems to be a significant user-facing change especially if we want to enable AE as cluster-default in the future. E.g., if a user has set spark.sql.shuffle.partitions to 10K for a large join, with AE enabled he has to set maxNumPostShufflePartitions to 10K otherwise he will only get 500. I had to make a small patch when testing AE in production jobs, by setting maxNumPostShufflePartitions = spark.sql.shuffle.partitions, if spark.sql.shuffle.partitions != default/200. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user is familiar with the AE configuration, can he just set the maxNumPostShufflePartitions for the large join when he enables AE? If he set spark.sql.shuffle.partitions to 10K in non-AE mode. I expect he will set a higher value for maxNumPostShufflePartitions like 20K in AE mode. So AE can find a good reducer number between 1 and 20K and usually a better value than 10K.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carsonwang, I am afraid we are making a risky assumption that user needs to be familiar with AE config. it is not a concern for me when AE is an on-demand feature. however, I find the current version of this PR is setting spark.sql.adaptive.enabled = true, which means we plan to enable AE mode by default, then when we roll out next version of Spark in our cluster we can break a lot of prod jobs with custom spark.sql.shuffle.partitions. I proposed a change to adjust maxNumPostShufflePartitions based on spark.sql.shuffle.partitions which I think is safer, if the upstream plan is also to set AE as cluster default mode in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I proposed a change to adjust maxNumPostShufflePartitions based on spark.sql.shuffle.partitions which I think is safer.
I think it makes sense. Actually, during our internal practice, we set the default value of maxNumPostShufflePartitions = 1.5 * spark.sql.shuffle.partitions. But for the common code here, a magic number of 1.5 here cause doubt, maybe we need a discussion for the strategy of setting an appropriate default value for maxNumPostShufflePartitions
based on spark.sql.shuffle.partitions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xuanyuanking for the input! Instead of setting maxNumPostShufflePartitions based on a magic number 1.5X or 500, I would propose to add a conf to replace maxNumPostShufflePartitions, which is a ratio of spark.sql.shuffle.partitions. The default value could be 1.0 so the behavior of AE's initial partition number is consistent with spark.sql.shuffle.partitions in non-AE mode. With 1.5 or 2, one of my concern is it could potentially bring an increase in shuffle service load when we enable AE as cluster default, as we have seen shuffle service scalability issues in our cluster when handling very large shuffle workloads.
cc @carsonwang @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good point we should not break anything when AE is enabled by default at cluster level. Currently it is enabled only for test purpose. But it is possible we enable it by default in future. What about we set maxNumPostShufflePartitions to spark.sql.shuffle.partitions by default? We are currently using the maxNumPostShufflePartitions as an initial partition number. But in future, if we can find a better initial partition number between minNumPostShufflePartitions and maxNumPostShufflePartitions at runtime, maxNumPostShufflePartitions will be treated as a max limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carsonwang, great, I support setting maxNumPostShufflePartitions = spark.sql.shuffle.partitions by default. This is exactly what I did internally when testing AE. Based on @xuanyuanking 's scenario, setting maxNumPostShufflePartitions as a configurable ratio to spark.sql.shuffle.partitions would also make sense to me.
Excited to see AE making progress in upstream:) We have used the new AE framework to add SQL optimization rules and the result looks very promising. We have a few comments for this patch in general:
|
@fangshil , thanks for the suggestions and feedbacks! When measuring the performance, please also include the patch #19788 which has a big impact on the AE performance. As you suggested, dynamically estimating the initial parallelism will be helpful and it will be much easier for users to use AE. We also did some works about that and can contribute in future PRs. |
@carsonwang What happens when we call df.repartition(500) on a 10MB with AQE turned on? AQE will still ignore the explicit repartition right? This might be unintuitive to users. Perhaps we can provide an option to let people decide whether they want AQE to apply to the repartition call? |
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") | ||
.intConf | ||
.createWithDefault(-1) | ||
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: we can simply write _ > 0
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") | ||
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.") | ||
.intConf | ||
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
* There are 2 kinds of query stages: | ||
* 1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches | ||
* another job to execute the further operators. | ||
* 2. Broadcast stage. This stage materializes its output to an array in driver JVM. Spark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Broadcast query stage
spark.sql("SET spark.sql.exchange.reuse=true") | ||
val df = spark.range(1).selectExpr("id AS key", "id AS value") | ||
|
||
// test case 1: a fragment has 3 child fragments but they are the same fragment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fragment
-> query stage
Test build #103545 has finished for PR 20303 at commit
|
|
||
val simpleNodeName = "Exchange" | ||
s"$simpleNodeName$extraInfo" | ||
"Exchange" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:nit No need {}
* | ||
* When one query stage finishes materialization, a list of adaptive optimizer rules will be | ||
* executed, trying to optimize the query plan with the data statistics collected from the the | ||
* materialized data. Then we travers the query plan again and try to insert more query stages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:nit traverse
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
case shuffle @ ShuffleExchangeExec(upper: HashPartitioning, child) => | ||
child.outputPartitioning match { | ||
case lower: HashPartitioning if upper.semanticEquals(lower) => child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be any difference if we judge by lower.satisfies(fatherOperator.requiredDistribution)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from EnsureRequirements
, but I think there is a difference: the number of partitions matters in semanticEquals
. That said, lower.satisfies(fatherOperator.requiredDistribution)
is more aggressive and may remove user-specified shuffle via something like df.partitionBy
} | ||
} | ||
|
||
private def createQueryStage(e: Exchange): QueryStageExec = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The three createQueryStage(s) brought some confusion...
// number of partitions, they will have the same number of pre-shuffle partitions | ||
// (i.e. map output partitions). | ||
assert( | ||
distinctNumPreShufflePartitions.length == 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases this has to be 1, but in cases when the children QueryStages are not in one whole-stage code generation block, we can still do adaptive execution even if the distinct value is larger than 1. For example, the root operator is a Union(it doesn't support codegen), and the two children are both ShuffleExchanges. In this case, the 2 ShuffleExchanges don't have to share the same number of pre shuffle partitions. They can reduce post shuffle partitions separately. I am not sure if I think it right. cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a long-standing issue and I don't have a good idea to deal with Union and Join differently.
Test build #103807 has finished for PR 20303 at commit
|
@justinuang , sorry for late reply. For |
@carsonwang we found a bug in production when AQE is turned on: Here is a case where the ShuffleQueryStageInputs to a Union node will have differing number of partitions if we explicitly repartition them. Here is a repro
My immediate thought was to get rid of the assert and instead skip automatic repartitioning if the number of input partitions are different. There might be a better way to fix this though, haven't given it much thought yet. |
Ping =) Any thoughts on the post above? Unfortunately this meant that we had to revert AQE in our fork. |
@justinuang , in theory we can handle Union separately and remove that limitation. But for now I think we can to skip changing the reducer number just as you mentioned. |
@carsonwang are there plans to continue work on this PR? |
@justinuang , we are still waiting some updates from @cloud-fan |
@carsonwang, Are we still working on this pull request? @cloud-fan, @justinuang |
@jerrychenhf This #24706 is to implement a framework for adaptive query execution based on this PR. Please review this PR: #24706 |
assume this can be closed? |
yes, closing this in favor of #24706 |
What changes were proposed in this pull request?
This is the co-work with @yucai , @gczsjdy , @chenghao-intel , @xuanyuanking
We'd like to introduce a new approach to do adaptive execution in Spark SQL. The idea is described at https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing
How was this patch tested?
Updated ExchangeCoordinatorSuite.
We also tested this with all queries in TPC-DS.