-
Couldn't load subscription status.
- Fork 1.7k
Fix SortPreservingRepartition with no existing ordering. #7811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @mustafasrepo -- this would have taken me quite a while to figure out.
I reviewed this PR carefully and verified it solved my problem downstream.
| // corresponding SortExecs together. Also, the inputs of these `SortExec`s | ||
| // are not necessarily the same to be able to remove them. | ||
| let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]", | ||
| let expected_input = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff made it hard to see -- but I am pretty sure this test did not change, just the formatting did. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just inserted new line
| " SortExec: expr=[a@0 ASC,b@1 ASC]", | ||
| " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", | ||
| ]; | ||
| let expected_optimized = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the code in this PR, I verified this test failed. The plan showed the same pattern as described in #7794
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: []",
" SortExec: expr=[a@0 ASC,b@1 ASC]",
" SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
|
I don't see any reason to delay merging this fix. Thanks again @mustafasrepo |
| " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", | ||
| " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"]; | ||
| let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", | ||
| let expected_optimized = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above case
Which issue does this PR close?
Closes #7794.
Rationale for this change
What changes are included in this PR?
In the issue,
EnforceSortingmoves up the sort that satisfies the window. In this case, operators that are below theSortExecdo not have ordering at their inputs. However, some of them (SortPreservingRepartitionExec) expects its input to be ordered. While removingSortExecdown the plan, we also replace operators with their corresponding variants that do not preserve order. With this change I believeSortPreservingRepartitionExecwill be replaced withRepartitionExecin the issue.Are these changes tested?
Yes, a new test is added to replicate problem in the issue.
Are there any user-facing changes?