Skip to content

Conversation

carsonwang
Copy link
Contributor

What changes were proposed in this pull request?

This is the co-work with @yucai , @gczsjdy , @chenghao-intel , @xuanyuanking

We'd like to introduce a new approach to do adaptive execution in Spark SQL. The idea is described at https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing

How was this patch tested?

Updated ExchangeCoordinatorSuite.
We also tested this with all queries in TPC-DS.

@carsonwang
Copy link
Contributor Author

cc @cloud-fan , @gatorsmile , @yhuai

@SparkQA
Copy link

SparkQA commented Jan 18, 2018

Test build #86305 has finished for PR 20303 at commit e0b98fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

// 3. Codegen and update the UI
child = CollapseCodegenStages(sqlContext.conf).apply(child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this line to:

    child = child match {
      case s: WholeStageCodegenExec => s
      case other => CollapseCodegenStages(sqlContext.conf).apply(other)
    }

?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child seems won't be WholeStageCodegenExec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, @gczsjdy is correct.
In adaptive execution, there is no the whole stage codegen in QueryExecution.adaptivePreparations, so child could not be WholeStageCodegenExec.

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86697 has finished for PR 20303 at commit 9a1301f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@carsonwang
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86757 has finished for PR 20303 at commit 9a1301f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 9, 2018

Test build #87236 has finished for PR 20303 at commit 603c6d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aaron-aa
Copy link

aaron-aa commented Oct 4, 2018

@carsonwang what's your plan to merge this fix into master or some release? thanks a lot!

@carsonwang
Copy link
Contributor Author

@aaron-aa , the committers agreed to start reviewing the code after 2.4 release.

@aaron-aa
Copy link

@carsonwang Thanks

@carsonwang
Copy link
Contributor Author

@cloud-fan @gatorsmile , are you ready to start reviewing this? I can bring this update to date.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good. One thing I'm unclear is how whole stage codegen is applied to query stages recursively. Can you explain it a little more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that this rule must be run after EnsureRequirements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will add it and rebase the code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that this should be applied at last as it actually divide the tree into multiple sub-trees?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is commented in QueryExecution when using this rule. Let me also add it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when mapOutputStatistics can be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the childStage's RDD has 0 partition, we will not submit that stage. See ShuffleExchangeExec.eagerExecute. In that case, mapOutputStatistics will be null so we filter it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's a var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a var so that we can update the plan at run time by directly assigning a new child to ShuffleQueryStage. This won't affect other query stages.

@carsonwang
Copy link
Contributor Author

@cloud-fan , we don't apply whole stage codegen to query stages recursively. In QueryStage.prepareExecuteStage, we first execute child stages and wait for their completions. Based on the child stage statistics, we can potentially update the plan and reducer number in current query stage. After that, we do a whole stage codegen only to the plan in the current query stage. Note, the QueryStageInput is a leaf node so the whole stage codegen won't apply to child stages.

carsonwang and others added 10 commits January 11, 2019 16:06
…es when call executeCollect, executeToIterator and executeTake action multi-times (apache#70)

* Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times

* only add the check in prepareExecuteStage method to avoid duplicate check in other methods

* small fix
@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101268 has finished for PR 20303 at commit 5819826.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"Comma separated list of filter class names to apply to the Spark Web UI.\")

@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101267 has finished for PR 20303 at commit 2c55985.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

* do not re-implement exchange reuse

* simplify QueryStage

* add comments

* new idea

* polish

* address comments

* improve QueryStageTrigger
@SparkQA
Copy link

SparkQA commented Jan 22, 2019

Test build #101520 has finished for PR 20303 at commit ea93dbf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102848 has finished for PR 20303 at commit bef8ab8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102855 has finished for PR 20303 at commit 2d6f110.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


withCoordinator
}
private def defaultNumPreShufflePartitions: Int =
Copy link

@fangshil fangshil Mar 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang With AE being the new mode, spark.sql.adaptive.maxNumPostShufflePartitions will replace spark.sql.shuffle.partitions to determine initial shuffle parallelism, this seems to be a significant user-facing change especially if we want to enable AE as cluster-default in the future. E.g., if a user has set spark.sql.shuffle.partitions to 10K for a large join, with AE enabled he has to set maxNumPostShufflePartitions to 10K otherwise he will only get 500. I had to make a small patch when testing AE in production jobs, by setting maxNumPostShufflePartitions = spark.sql.shuffle.partitions, if spark.sql.shuffle.partitions != default/200. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user is familiar with the AE configuration, can he just set the maxNumPostShufflePartitions for the large join when he enables AE? If he set spark.sql.shuffle.partitions to 10K in non-AE mode. I expect he will set a higher value for maxNumPostShufflePartitions like 20K in AE mode. So AE can find a good reducer number between 1 and 20K and usually a better value than 10K.

Copy link

@fangshil fangshil Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang, I am afraid we are making a risky assumption that user needs to be familiar with AE config. it is not a concern for me when AE is an on-demand feature. however, I find the current version of this PR is setting spark.sql.adaptive.enabled = true, which means we plan to enable AE mode by default, then when we roll out next version of Spark in our cluster we can break a lot of prod jobs with custom spark.sql.shuffle.partitions. I proposed a change to adjust maxNumPostShufflePartitions based on spark.sql.shuffle.partitions which I think is safer, if the upstream plan is also to set AE as cluster default mode in the future

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 I proposed a change to adjust maxNumPostShufflePartitions based on spark.sql.shuffle.partitions which I think is safer.

I think it makes sense. Actually, during our internal practice, we set the default value of maxNumPostShufflePartitions = 1.5 * spark.sql.shuffle.partitions. But for the common code here, a magic number of 1.5 here cause doubt, maybe we need a discussion for the strategy of setting an appropriate default value for maxNumPostShufflePartitions based on spark.sql.shuffle.partitions.

Copy link

@fangshil fangshil Mar 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xuanyuanking for the input! Instead of setting maxNumPostShufflePartitions based on a magic number 1.5X or 500, I would propose to add a conf to replace maxNumPostShufflePartitions, which is a ratio of spark.sql.shuffle.partitions. The default value could be 1.0 so the behavior of AE's initial partition number is consistent with spark.sql.shuffle.partitions in non-AE mode. With 1.5 or 2, one of my concern is it could potentially bring an increase in shuffle service load when we enable AE as cluster default, as we have seen shuffle service scalability issues in our cluster when handling very large shuffle workloads.
cc @carsonwang @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good point we should not break anything when AE is enabled by default at cluster level. Currently it is enabled only for test purpose. But it is possible we enable it by default in future. What about we set maxNumPostShufflePartitions to spark.sql.shuffle.partitions by default? We are currently using the maxNumPostShufflePartitions as an initial partition number. But in future, if we can find a better initial partition number between minNumPostShufflePartitions and maxNumPostShufflePartitions at runtime, maxNumPostShufflePartitions will be treated as a max limit.

Copy link

@fangshil fangshil Mar 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang, great, I support setting maxNumPostShufflePartitions = spark.sql.shuffle.partitions by default. This is exactly what I did internally when testing AE. Based on @xuanyuanking 's scenario, setting maxNumPostShufflePartitions as a configurable ratio to spark.sql.shuffle.partitions would also make sense to me.

@fangshil
Copy link

fangshil commented Mar 4, 2019

Excited to see AE making progress in upstream:) We have used the new AE framework to add SQL optimization rules and the result looks very promising. We have a few comments for this patch in general:

  1. The current patch handles shuffle parallelism on reducer side, as it starts with a relatively large number of mapper partitions(500), and merge into fewer reducer partitions by allowing each reducer to read multiple mappers. For large data scale, setting 10K to spark.sql.shuffle.partitions in non-AE VS maxNumPostShufflePartitions in AE should have same results since the reducer number won't change when data is large. I think with this patch, we haven't got the optimal performance since we only save the overhead of launching a certain number reduce tasks. A better approach would be dynamically estimating the initial/mapper parallelism between 0 and maxNumPostShufflePartitions. This should be made possible by AE as well, while this patch should be a solid foundation for future improvements. Hope we can merge it soon!

  2. This patch uses submitMapStage API. The API would submit each stage as a new job, so AE breaks Spark's vanilla definition of a job. This is an issue we inherit from the original AE, not originating from this new AE.

@carsonwang
Copy link
Contributor Author

@fangshil , thanks for the suggestions and feedbacks! When measuring the performance, please also include the patch #19788 which has a big impact on the AE performance. As you suggested, dynamically estimating the initial parallelism will be helpful and it will be much easier for users to use AE. We also did some works about that and can contribute in future PRs.

@justinuang
Copy link

justinuang commented Mar 13, 2019

@carsonwang What happens when we call df.repartition(500) on a 10MB with AQE turned on? AQE will still ignore the explicit repartition right? This might be unintuitive to users.

Perhaps we can provide an option to let people decide whether they want AQE to apply to the repartition call?

.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.createWithDefault(-1)
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: we can simply write _ > 0

buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* There are 2 kinds of query stages:
* 1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches
* another job to execute the further operators.
* 2. Broadcast stage. This stage materializes its output to an array in driver JVM. Spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Broadcast query stage

spark.sql("SET spark.sql.exchange.reuse=true")
val df = spark.range(1).selectExpr("id AS key", "id AS value")

// test case 1: a fragment has 3 child fragments but they are the same fragment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fragment -> query stage

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103545 has finished for PR 20303 at commit 028b0ac.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val simpleNodeName = "Exchange"
s"$simpleNodeName$extraInfo"
"Exchange"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit No need {}

*
* When one query stage finishes materialization, a list of adaptive optimizer rules will be
* executed, trying to optimize the query plan with the data statistics collected from the the
* materialized data. Then we travers the query plan again and try to insert more query stages.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit traverse

override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case shuffle @ ShuffleExchangeExec(upper: HashPartitioning, child) =>
child.outputPartitioning match {
case lower: HashPartitioning if upper.semanticEquals(lower) => child
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be any difference if we judge by lower.satisfies(fatherOperator.requiredDistribution)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from EnsureRequirements, but I think there is a difference: the number of partitions matters in semanticEquals. That said, lower.satisfies(fatherOperator.requiredDistribution) is more aggressive and may remove user-specified shuffle via something like df.partitionBy

}
}

private def createQueryStage(e: Exchange): QueryStageExec = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three createQueryStage(s) brought some confusion...

// number of partitions, they will have the same number of pre-shuffle partitions
// (i.e. map output partitions).
assert(
distinctNumPreShufflePartitions.length == 1,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases this has to be 1, but in cases when the children QueryStages are not in one whole-stage code generation block, we can still do adaptive execution even if the distinct value is larger than 1. For example, the root operator is a Union(it doesn't support codegen), and the two children are both ShuffleExchanges. In this case, the 2 ShuffleExchanges don't have to share the same number of pre shuffle partitions. They can reduce post shuffle partitions separately. I am not sure if I think it right. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a long-standing issue and I don't have a good idea to deal with Union and Join differently.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103807 has finished for PR 20303 at commit 2e08778.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@carsonwang
Copy link
Contributor Author

@justinuang , sorry for late reply. For df.repartition(500), AE does use the specified number for repartition. That is, each stage will writes the map output with 500 partitions. However, in the following up stage, AE may launch tasks less than 500 as one task can process multiple continues blocks.

@justinuang
Copy link

@carsonwang we found a bug in production when AQE is turned on:

Here is a case where the ShuffleQueryStageInputs to a Union node will have differing number of partitions if we explicitly repartition them.

Here is a repro

      val sparkSession = SparkSession.builder()
        .master("local[2]")
        .config("spark.sql.autoBroadcastJoinThreshold", "-1")
        .config("spark.sql.adaptive.enabled", "true")
        .getOrCreate();

      val dataset1 = sparkSession.range(1000);
      val dataset2 = sparkSession.range(1001);

      val compute = dataset1.repartition(505, dataset1.col("id"))
        .union(dataset2.repartition(105, dataset2.col("id")))

      compute.show()
      compute.explain()
== Parsed Logical Plan ==
Union
:- AnalysisBarrier RepartitionByExpression [id#152L], 505
+- AnalysisBarrier RepartitionByExpression [id#155L], 105

== Analyzed Logical Plan ==
id: bigint
Union
:- RepartitionByExpression [id#152L], 505
:  +- Range (0, 1000, step=1, splits=Some(2))
+- RepartitionByExpression [id#155L], 105
   +- Range (0, 1001, step=1, splits=Some(2))

== Optimized Logical Plan ==
Union
:- RepartitionByExpression [id#152L], 505
:  +- Range (0, 1000, step=1, splits=Some(2))
+- RepartitionByExpression [id#155L], 105
   +- Range (0, 1001, step=1, splits=Some(2))

== Physical Plan ==
*Union
:- *Exchange hashpartitioning(id#152L, 505)
:  +- *Range (0, 1000, step=1, splits=2)
+- *Exchange hashpartitioning(id#155L, 105)
   +- *Range (0, 1001, step=1, splits=2)

assertion failed: There should be only one distinct value of the number pre-shuffle partitions among registered Exchange operator.
java.lang.AssertionError: assertion failed: There should be only one distinct value of the number pre-shuffle partitions among registered Exchange operator.
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.estimatePartitionStartIndices(ExchangeCoordinator.scala:119)
	at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:104)
	at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:138)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3262)
...

My immediate thought was to get rid of the assert and instead skip automatic repartitioning if the number of input partitions are different. There might be a better way to fix this though, haven't given it much thought yet.

@justinuang
Copy link

Ping =) Any thoughts on the post above? Unfortunately this meant that we had to revert AQE in our fork.

@carsonwang
Copy link
Contributor Author

@justinuang , in theory we can handle Union separately and remove that limitation. But for now I think we can to skip changing the reducer number just as you mentioned.

@justinuang
Copy link

justinuang commented May 9, 2019

@carsonwang are there plans to continue work on this PR?

@carsonwang
Copy link
Contributor Author

@justinuang , we are still waiting some updates from @cloud-fan

@jerrychenhf
Copy link

@carsonwang, Are we still working on this pull request? @cloud-fan, @justinuang
I saw another pull request #24706 @maryannxue is in working. Does that one deprecated this one and we should work and discuss on #24706?

@gatorsmile
Copy link
Member

@jerrychenhf This #24706 is to implement a framework for adaptive query execution based on this PR. Please review this PR: #24706

@tgravescs
Copy link
Contributor

assume this can be closed?

@carsonwang
Copy link
Contributor Author

yes, closing this in favor of #24706

@carsonwang carsonwang closed this Jul 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.