-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #37099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #37099
Conversation
dca128e to
13042a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @allisonwang-db for picking this up! Knowing the PR is still draft, but just leave some early questions/comments. cc @ulysses-you and @cloud-fan as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 3.4.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: feeling the config name is a little bit obscure. could it be spark.sql.requireOrderingForV1Writers or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed the name is not very descriptive. Planned write here means we want to explicitly plan file writes instead of adding various operations when executing the write. It could include things other than required ordering in the future. I am happy to brainstorm more here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark -> Spark optimizer could be clearer that sort is added during query planning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry why we need the comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the confusing comment. It means we don't need other option values like table path when getting the writer bucket spec.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
Lines 176 to 180 in 3331d4c
| val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { | |
| Some(sessionState.catalog.defaultTablePath(table.identifier)) | |
| } else { | |
| table.storage.locationUri | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just for debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in unit test to check when v1 writes is enabled, we should have added a logical sort and thus do not need to add a physical sort (ordering should match).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just brainstorming here, if we plan to add a requirement for partitioning, e.g. support shuffle before writing bucket table. Do we want to add a similar RequiresDistributionAndOrdering as v2 now or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add one more method: requiredPartitioning
7507ae6 to
44662ed
Compare
|
Thanks @cloud-fan and @allisonwang-db for pushing on this. This is great! I will work on supporting required partitioning for V1 in this week (https://issues.apache.org/jira/browse/SPARK-37287). The motivation is to support shuffling on bucket columns when writing Hive bucket table. cc @cloud-fan, @allisonwang-db and @ulysses-you FYI. |
|
So there was an optimization in #32198 which can avoid local sort if there are only a small set of partition/bucket values. |
|
That optimization is off by default. When it's turned on, we skip planned write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @allisonwang-db and @cloud-fan
There is a correctness issue report for this configuration, SPARK-44512, for Apache Spark 3.4.0+. Could you take a look at that?
|
Sorry all. After checking the reported use case once more, I found that that it's a false alarm. I closed the issue as |
…rom `DataSource` ### What changes were proposed in this pull request? `resolvePartitionColumns` was introduced by SPARK-37287 (#37099) and become unused after SPARK-41713 (#39220), so this pr remove it from `DataSource`. ### Why are the changes needed? Clean up unused code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43779 from LuciferYang/SPARK-45902. Lead-authored-by: yangjie01 <[email protected]> Co-authored-by: YangJie <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
FileFormatWriter.writeis used by all V1 write commands including data source and hive tables. Depending on dynamic partitions, bucketed, and sort columns in the V1 write command,FileFormatWritercan add a physical sort on top of the query plan which is not visible from plan directly.This PR (based on #34568) intends to pull out the physical sort added by
FileFormatWriterinto logical planning. It adds a new logical ruleV1Writesto add logical Sort operators based on the required ordering of a V1 write command. This behavior can be controlled by the new config spark.sql.optimizer.plannedWrite.enabled (default: true).Why are the changes needed?
Improve observability of V1 write, and unify the logic of V1 and V2 write commands.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests.