-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #34568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is because maxConcurrentOutputFileWriters need to create sorter at FileFormatWriter
|
Kubernetes integration test starting |
19a367f to
e38ed44
Compare
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #145149 has finished for PR 34568 at commit
|
|
Kubernetes integration test status failure |
|
Test build #145155 has finished for PR 34568 at commit
|
e38ed44 to
d673f8d
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145163 has finished for PR 34568 at commit
|
|
cc @MaxGekk @cloud-fan if you have time to take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ulysses-you for improving on this! Have some questions. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help create a JIRA here? Thanks, cc @wangyum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created SPARK-37333
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's safe to check output ordering inside logical plan. The output ordering may be changed quite much during physical planning (e.g. shuffle added for join/aggregate can destroy output ordering). Ideally we should rely on physical plan rule EnsureRequirements to add proper sort.
I am wondering, how hard to make the code rely on SparkPlan.requiredChildOrdering for DSv1 write code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after taking a deeper look, LogicalPlan.outputOrdering was introduced to eliminate unnecessary sort in logical planning (#20560), and only several operators preserves ordering (like Filter, Project), so it won't cause correctness issue here.
But the problem of LogicalPlan.outputOrdering is it being too conservative. We may add unnecessary sort here for complex queries (e.g. query with sort merge join on partition columns before writing to table with dynamic partitions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @c21 for pointing out this and I see what you concern about. The reason I used the LogicalPlan.outputOrdering is:
- Add sort at logical side has benefits if the plan exists a sort. e.g.
We can eliminate the user specified sort using
InsertIntoTable (partition) Sort (not dynamic columns) ....EliminateSortsinOptimizer. But if we add the sort at physical plan, we will do the sort twice even the first sort has no effect. - For now, I prefer to keep the same approach with
V2Writeswhich also add the required ordering even distribution at logical side. We can optimize them together if we find a more better approach in future. - I thnk it's safe and no perf regression that add a sort at logical side. Since we have the
RemoveRedundantSortsat physical side, that rule can remove the sort we added if it's uncessary (e.g. sort + smj with dynamic partitions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you - ah yes, we are also having RemoveRedundantSorts at physical planning, so I think we are good here. Thanks for explanation!
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145238 has finished for PR 34568 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145453 has finished for PR 34568 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does slightly confuses that V*Writes are here. Look, earlyScanPushDownRules is about:
" ... projection and filter pushdown to scans"
but V1Writes:
"... makes sure the v1 write requirement, e.g. requiredOrdering"
something like opposite thing - pulling from instead of pushing down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is some history of why V2Writes is at earlyScanPushDownRules , #30806 (comment).
I agree the name is not matched, do you have other better place to go ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, fix indentation like in the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
dfd7435 to
7fddb62
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145740 has finished for PR 34568 at commit
|
7fddb62 to
d36f2f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ulysses-you for the work! Having some comments. @cloud-fan and @wangyum could you guys help take a look when you have time? Thanks.
| val enableRadixSort = sparkSession.sessionState.conf.enableRadixSort | ||
| val outputSchema = empty2NullPlan.schema | ||
| Some(ConcurrentOutputWriterSpec(maxWriters, | ||
| () => SortExec.createSorter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this refactoring (SortExec.createSorter) is not very necessary. Why can't we create a SortExec operator and call createSorter() as before? What's the advantage of current code compared to before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at the previous code, we create and eval a SortExec is mainly for the ordering of dynamic partition. For the concurrent writers, we only need the sorter. After we pull out the sort, create a new SortExec seems overkill.
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * V1 write includes both datasoruce and hive, that requires a specific ordering of data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: datasoruce -> datasource v1
|
|
||
| trait V1WritesHelper { | ||
|
|
||
| def getBucketSpec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about naming it as getWriterBucketSpec? BucketSpec is another class in Spark, which is different from WriterBucketSpec. Also bucketSpec is a parameter, so getWriterBucketSpec looks less confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense !
| } | ||
| } | ||
|
|
||
| trait V1WritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking through the subclasses of this one, I found this class is meant to be a utility class, but not an interface to implement. Shall we change this to object V1WritesUtils ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of the V1WritesHelper is from the AdaptiveSparkPlanHelper which also contains some util methods. And there are many place in sql use the helper even if they are not stateful. Personally, I don't have a big option about utility and helper.
| import org.apache.spark.sql.execution.datasources.BucketingUtils | ||
| import org.apache.spark.sql.hive.client.HiveClientImpl | ||
|
|
||
| trait V1HiveWritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be a utility class as well, how about object V1WritesForHiveUtils?
| import org.apache.spark.sql.hive.client.HiveClientImpl | ||
|
|
||
| trait V1HiveWritesHelper { | ||
| def options(bucketSpec: Option[BucketSpec]): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can make the function name more verbose, e.g. getOptionsWithHiveBucketWrite
| * | ||
| * TODO(SPARK-37333): Specify the required distribution at V1Write | ||
| */ | ||
| trait V1Write extends DataWritingCommand with V1WritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1Write extending an interface called V1WritesHelper, looks a bit weird. I think V1WrtiesHelper is just a utility class, so we don't need extend here (per https://github.com/apache/spark/pull/34568/files#r807523085).
| val requiredOrdering = | ||
| partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||
| // the sort order doesn't matter | ||
| val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a issue here, since we have AQE. The plan is the AdaptiveSparkPlanExec who has no outputOrdering. For dynamic partition write, the code will always add an extra sort.
This pr can resolve this issue together. @cloud-fan @c21
…leFormatWriter ### What changes were proposed in this pull request? `FileFormatWriter.write` is used by all V1 write commands including data source and hive tables. Depending on dynamic partitions, bucketed, and sort columns in the V1 write command, `FileFormatWriter` can add a physical sort on top of the query plan which is not visible from plan directly. This PR (based on #34568) intends to pull out the physical sort added by `FileFormatWriter` into logical planning. It adds a new logical rule `V1Writes` to add logical Sort operators based on the required ordering of a V1 write command. This behavior can be controlled by the new config **spark.sql.optimizer.plannedWrite.enabled** (default: true). ### Why are the changes needed? Improve observability of V1 write, and unify the logic of V1 and V2 write commands. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests. Closes #37099 from allisonwang-db/spark-37287-v1-writes. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
V1Writeto hold some sort infos of v1 write. e.g., partition columns, bucket spec.V1Write, includes both datasource and hiveV1Writesto decide if we should add aSortoperator based itsV1Write.requiredOrdering. This rule should be similar withV2Writes.SortExecinFileFormatWriter.write.Why are the changes needed?
FileFormatWriter.writenow is used by all V1 write which includes datasource and hive table. However it contains a sort which is based on dynamic partition and bucket columns that can not be seen in plan directly.V2 write has a better approach that it satisfies the order or even distribution by using rule
V2Writes.V1 write should do the similar thing with V2 write.
Does this PR introduce any user-facing change?
no.
How was this patch tested?
this is a code refactor, so it should pass CI