Skip to content
Merged
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,14 +929,12 @@ fn add_roundrobin_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
let should_preserve_ordering = input.output_ordering().is_some();

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input, partitioning)?;
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
as Arc<dyn ExecutionPlan>;
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();

// update distribution onward with new operator
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Ok(new_plan)
} else {
Expand Down Expand Up @@ -999,7 +997,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
Expand All @@ -1008,9 +1005,10 @@ fn add_hash_on_top(
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
new_plan =
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
// preserve any ordering if possible
.with_preserve_order();
new_plan = Arc::new(repartition) as _;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1159,11 +1157,11 @@ fn replace_order_preserving_variants_helper(
if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
if repartition.preserve_order() {
return Ok(Arc::new(
// new RepartitionExec don't preserve order
RepartitionExec::try_new(
updated_children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @ozankabak 's suggestion, I removed the with_preserve_order(false) as well as the boolean argument which was confusing. Now with_preserve_order() is only called when the code is attempting to maintain the order

)?,
));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,11 @@ fn remove_corresponding_sort_from_sub_plan(
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
Arc::new(
// By default, RepartitionExec does not preserve order
RepartitionExec::try_new(
children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
)?,
)
} else {
plan.clone().with_new_children(children)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ fn get_updated_plan(
// a `SortPreservingRepartitionExec` if appropriate:
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
let child = plan.children().swap_remove(0);
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?;
plan = Arc::new(repartition.with_preserve_order(true)) as _
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order();
plan = Arc::new(repartition) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionP
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand All @@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
Loading