-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Fix: Do not try and preserve order when there is no order to preserve in RepartitionExec #8127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_repartition_with_unecessary_preserve_order() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super happy with the test as it isn't a test of the optimizer, but the only time this can cause problems is when RepartitionExec::with_new_child is called, which I found really hard to
trigger with the optimizer tests.
Without the code in this PR, this test fails like:
[
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
" MemoryExec: partitions=1, partition_sizes=[0]",
" MemoryExec: partitions=1, partition_sizes=[0]",
]
actual:
[
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
" MemoryExec: partitions=1, partition_sizes=[0]",
" MemoryExec: partitions=1, partition_sizes=[0]",
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move this test to the same file with struct RepartitionExec as a unit test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two call sites of with_preserve_order in enforce_distribution.rs that implements the same logic but passes the is_some term from the outside. I think we should simplify those call sites to with_preserve_order(true) in this PR.
I also see one call with_preserve_order(false) which is unnecessary and confusing in enforce_distribution.rs as well as one such call in replace_with_order_preserving_variants.rs. Let's remove them.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_repartition_with_unecessary_preserve_order() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move this test to the same file with struct RepartitionExec as a unit test?
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
yes, that is a good idea -- I did so in ee55ec3
That is an excellent idea, I did so in 0c492e1 and I think the logic is much clearer now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you
| updated_children.swap_remove(0), | ||
| repartition.partitioning().clone(), | ||
| )? | ||
| .with_preserve_order(false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per @ozankabak 's suggestion, I removed the with_preserve_order(false) as well as the boolean argument which was confusing. Now with_preserve_order() is only called when the code is attempting to maintain the order
|
Thanks @alamb for this PR. Having this check at initialization is great, and doesn't lead to unexpected behavior. |
Which issue does this PR close?
Closes #8043
Rationale for this change
We encountered problem is our downstream tests that
RepartitionExecwas trying to preserve an order even when there is no order to preserve.This was because one codepath (
RepartitionExec::new_with_children) it set thepreserve_orderflag to true when it creates a newRepartitionExeceven if the new child had no order to preserve. During execution, this cause the code to try and merge a stream with no sort exprs, which causes an internal errorWhat changes are included in this PR?
Are these changes tested?
Yes, though I am not thrilled with the test (I will comment inline)
Are there any user-facing changes?