-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering #39431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan here is the fix for SPARK-40588 migrated to Spark 3.4. This finally includes unit tests for the actual plan written to files (that has never been tested before). |
| |""".stripMargin) | ||
| executeAndCheckOrdering( | ||
| hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { | ||
| hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need the orderingMatched parameter if it's always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean, as in needed in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, shall we remove orderingMatched from the method executeAndCheckOrdering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no default value for orderingMatched and two other unit tests still use orderingMatched=enabled.
|
|
||
| // SPARK-40885: this bug removes the in-partition sort, which manifests here | ||
| case (true, SortExec(Seq( | ||
| SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know why the sorting key is different when planned write is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is correctness bug SPARK-40885 discussed in #38356
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you can you take a look at this bug?
|
All tests green: https://github.com/G-Research/spark/actions/runs/3855300306 |
|
Can one of the admins verify this patch? |
| // Use the output ordering from the original plan before adding the empty2null projection. | ||
| val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child) | ||
| val actualOrdering = writeFilesOpt.map(_.child) | ||
| .getOrElse(materializeAdaptiveSparkPlan(plan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put all code changes inside if writeFilesOpt is empty ? if writeFilesOpt is defined that means the write have been planned which does not have this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.getOrElse already does what you said, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, materializeAdaptiveSparkPlan is applied on plan only if writeFilesOpt is undefined.
|
thanks, merging to master! |
|
Thanks! |
What changes were proposed in this pull request?
The
FileFormatWritermaterializes anAdaptiveQueryPlanbefore accessing the plan'soutputOrdering. This is required when planned writing is disabled (spark.sql.optimizer.plannedWrite.enabledistrueby default). With planned writing enabledFileFormatWritergets the final plan already.Why are the changes needed?
FileFormatWriterenforces an ordering if the written plan does not provide that ordering. AnAdaptiveQueryPlandoes not know its final ordering, in which caseFileFormatWriterenforces the ordering (e.g. by column"a") even if the plan provides a compatible ordering (e.g. by columns"a", "b"). In case of spilling, that order (e.g. by columns"a", "b") gets broken (see SPARK-40588).Does this PR introduce any user-facing change?
This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores behaviour from Spark 2.4.
How was this patch tested?
The final plan that is written to files is now stored in
FileFormatWriter.executedPlan(similar to existingFileFormatWriter.outputOrderingMatched). Unit tests assert the outermost sort order written to files.The actual plan written into the files changed from (taken from
"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column"):where
FileFormatWriterenforces order withSort [input[2, int, false] ASC NULLS FIRST], false, 0, towhere the sort given by the user is the outermost sort now.