From 31d954ef255f54bce70ebbed7ca4d1c196916a92 Mon Sep 17 00:00:00 2001 From: Ruchir Khaitan Date: Thu, 20 Nov 2025 13:43:59 -0500 Subject: [PATCH] chore: update Repartition DisplayAs to indicate maintained sort order Previously, it was not obvious reading the plan diagram when a Repartition operator maintained sortedness by virtue of having a single input partition even if preserve_sort order was false. This commit makes the implicit sortedness preservation explicit in the plan diagram. This commit does not change anything for the case when preserve sort order is false, or if the input was not originally sorted. --- datafusion/core/tests/dataframe/mod.rs | 8 +- .../enforce_distribution.rs | 212 +++++++++--------- .../physical_optimizer/enforce_sorting.rs | 28 +-- .../replace_with_order_preserving_variants.rs | 120 +++++----- .../physical_optimizer/sanity_checker.rs | 10 +- datafusion/core/tests/sql/explain_analyze.rs | 4 +- datafusion/core/tests/sql/joins.rs | 8 +- .../physical-plan/src/repartition/mod.rs | 16 +- .../test_files/agg_func_substitute.slt | 6 +- .../sqllogictest/test_files/aggregate.slt | 4 +- .../test_files/filter_without_sort_exec.slt | 12 +- .../sqllogictest/test_files/group_by.slt | 12 +- .../join_disable_repartition_joins.slt | 4 +- datafusion/sqllogictest/test_files/joins.slt | 20 +- datafusion/sqllogictest/test_files/limit.slt | 2 +- .../test_files/monotonic_projection_test.slt | 10 +- datafusion/sqllogictest/test_files/order.slt | 28 +-- .../sqllogictest/test_files/qualify.slt | 2 +- datafusion/sqllogictest/test_files/select.slt | 14 +- .../test_files/sort_merge_join.slt | 2 +- .../test_files/spark/math/csc.slt | 2 +- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/unnest.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 12 +- 24 files changed, 272 insertions(+), 268 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e2f3ece1e4ca..b400c442389c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2869,7 +2869,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), - @r###" + @r" +---------------+------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------------------------------------------------------+ @@ -2889,12 +2889,12 @@ async fn test_count_wildcard_on_sort() -> Result<()> { | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | +---------------+------------------------------------------------------------------------------------------------------------+ - "### + " ); assert_snapshot!( pretty_format_batches(&df_results).unwrap(), - @r###" + @r" +---------------+----------------------------------------------------------------------------+ | plan_type | plan | +---------------+----------------------------------------------------------------------------+ @@ -2910,7 +2910,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> { | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | +---------------+----------------------------------------------------------------------------+ - "### + " ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index aa5a2d053926..a3d9a1e407c7 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1474,19 +1474,19 @@ SortMergeJoin: join_type=..., on=[(a@0, c@2)] JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(a@0, c@2)] - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(a@0, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin @@ -1500,22 +1500,22 @@ SortMergeJoin: join_type=..., on=[(a@0, c@2)] _ => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(a@0, c@2)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(a@0, c@2)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } } @@ -1581,40 +1581,40 @@ SortMergeJoin: join_type=..., on=[(b1@6, c@2)] JoinType::Inner | JoinType::Right => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(b1@6, c@2)] - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs JoinType::Left | JoinType::Full => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(b1@6, c@2)] - RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1 - SortExec: expr=[b1@6 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@6 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // this match arm cannot be reached _ => unreachable!() @@ -1703,27 +1703,27 @@ SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] // Test: result IS DIFFERENT, if EnforceSorting is run first: let plan_sort = test_config.to_plan(join, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] - RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1 - SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - ProjectionExec: expr=[a1@0 as a3, b1@1 as b3] - ProjectionExec: expr=[a1@1 as a1, b1@0 as b1] - AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[] - RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10 - AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1 - SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - ProjectionExec: expr=[a@1 as a2, b@0 as b2] - AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[] - RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10 - AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] + RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + ProjectionExec: expr=[a1@0 as a3, b1@1 as b3] + ProjectionExec: expr=[a1@1 as a1, b1@0 as b1] + AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[] + RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10 + AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + ProjectionExec: expr=[a@1 as a2, b@0 as b2] + AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[] + RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10 + AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); Ok(()) } @@ -1980,12 +1980,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" -SortRequiredExec: [c@2 ASC] - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortRequiredExec: [c@2 ASC] + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); // We can use repartition here, ordering requirement by SortRequiredExec // is still satisfied. let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB); @@ -2175,11 +2175,11 @@ fn repartition_does_not_destroy_sort() -> Result<()> { let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" -SortRequiredExec: [d@3 ASC] - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet -"); + SortRequiredExec: [d@3 ASC] + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet + "); // during repartitioning ordering is preserved let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_distrib, plan_sort); @@ -2740,14 +2740,14 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning assert_plan!(plan_parquet_distrib, @r" -GlobalLimitExec: skip=0, fetch=100 - CoalescePartitionsExec - LocalLimitExec: fetch=100 - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + GlobalLimitExec: skip=0, fetch=100 + CoalescePartitionsExec + LocalLimitExec: fetch=100 + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); let plan_parquet_sort = test_config.to_plan(plan_parquet, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_parquet_distrib, plan_parquet_sort); @@ -2758,14 +2758,14 @@ GlobalLimitExec: skip=0, fetch=100 // SortExec doesn't benefit from input partitioning assert_plan!(plan_csv_distrib, @r" -GlobalLimitExec: skip=0, fetch=100 - CoalescePartitionsExec - LocalLimitExec: fetch=100 - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false -"); + GlobalLimitExec: skip=0, fetch=100 + CoalescePartitionsExec + LocalLimitExec: fetch=100 + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + "); let plan_csv_sort = test_config.to_plan(plan_csv, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_csv_distrib, plan_csv_sort); diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index c0cfa46733f1..3bfc919a079c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -1631,13 +1631,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] Optimized Plan: SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] "); @@ -1649,13 +1649,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] Optimized Plan: SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] "); @@ -1674,7 +1674,7 @@ async fn test_with_lost_ordering_bounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false "); @@ -1686,14 +1686,14 @@ async fn test_with_lost_ordering_bounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false Optimized Plan: SortPreservingMergeExec: [a@0 ASC] SortExec: expr=[a@0 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false "); @@ -1715,7 +1715,7 @@ async fn test_do_not_pushdown_through_spm() -> Result<()> { Input / Optimized Plan: SortExec: expr=[b@1 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false "); @@ -1744,13 +1744,13 @@ async fn test_pushdown_through_spm() -> Result<()> { Input Plan: SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false Optimized Plan: SortPreservingMergeExec: [a@0 ASC, b@1 ASC] SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[true] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false "); Ok(()) @@ -1774,7 +1774,7 @@ async fn test_window_multi_layer_requirement() -> Result<()> { BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC, b@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false @@ -1969,7 +1969,7 @@ async fn test_remove_unnecessary_sort2() -> Result<()> { assert_snapshot!(test.run(), @r" Input Plan: RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC] SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] @@ -2016,7 +2016,7 @@ async fn test_remove_unnecessary_sort3() -> Result<()> { AggregateExec: mode=Final, gby=[], aggr=[] SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC] SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [non_nullable_col@1 ASC] SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] @@ -2365,7 +2365,7 @@ async fn test_commutativity() -> Result<()> { assert_snapshot!(displayable(orig_plan.as_ref()).indent(true), @r#" SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] DataSourceExec: partitions=1, partition_sizes=[0] "#); diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 066e52614a12..8fe35fc307df 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -192,7 +192,7 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -202,13 +202,13 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -218,13 +218,13 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -275,21 +275,21 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] Optimized: SortPreservingMergeExec: [a@0 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] "); }, @@ -300,11 +300,11 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC "); }, @@ -315,21 +315,21 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC Optimized: SortPreservingMergeExec: [a@0 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC "); } @@ -375,14 +375,14 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -393,7 +393,7 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -404,14 +404,14 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -460,7 +460,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -468,7 +468,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -480,7 +480,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -492,7 +492,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -500,7 +500,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -551,7 +551,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -560,7 +560,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -573,7 +573,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -586,7 +586,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -595,7 +595,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -639,7 +639,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -650,7 +650,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because there is no executor with ordering requirement @@ -662,7 +662,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -712,7 +712,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -721,7 +721,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -734,7 +734,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -747,7 +747,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -756,7 +756,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -804,7 +804,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -814,7 +814,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because ordering requirement of the executor is @@ -826,7 +826,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -870,13 +870,13 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -886,7 +886,7 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -896,13 +896,13 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -956,22 +956,22 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [c@1 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -982,11 +982,11 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -997,22 +997,22 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [c@1 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -1077,11 +1077,11 @@ async fn test_with_multiple_child_trees( HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -1093,11 +1093,11 @@ async fn test_with_multiple_child_trees( HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 9867ed173341..f46147de1bfd 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -556,10 +556,10 @@ async fn test_sort_merge_join_satisfied() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] " @@ -606,7 +606,7 @@ async fn test_sort_merge_join_order_missing() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 @@ -654,10 +654,10 @@ async fn test_sort_merge_join_dist_missing() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] " diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7a38934d29a3..4a67046e933d 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -811,7 +811,7 @@ async fn test_physical_plan_display_indent_multi_children() { assert_snapshot!( actual, - @r###" + @r" CoalesceBatchesExec: target_batch_size=4096 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0] CoalesceBatchesExec: target_batch_size=4096 @@ -821,7 +821,7 @@ async fn test_physical_plan_display_indent_multi_children() { RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1 ProjectionExec: expr=[c1@0 as c2] DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true - "### + " ); } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index b64a1e18f3f6..9b8d1ae75a38 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -73,10 +73,10 @@ async fn join_change_in_planner() -> Result<()> { @r" SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] " ); @@ -132,10 +132,10 @@ async fn join_no_order_on_filter() -> Result<()> { @r" SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] " ); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 53cd24185ff8..a98b3d4cc2b5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -739,6 +739,7 @@ impl RepartitionExec { impl DisplayAs for RepartitionExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + let input_partition_count = self.input.output_partitioning().partition_count(); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( @@ -746,11 +747,17 @@ impl DisplayAs for RepartitionExec { "{}: partitioning={}, input_partitions={}", self.name(), self.partitioning(), - self.input.output_partitioning().partition_count() + input_partition_count, )?; if self.preserve_order { write!(f, ", preserve_order=true")?; + } else if input_partition_count <= 1 + && self.input.output_ordering().is_some() + { + // Make it explicit that repartition maintains sortedness for a single input partition even + // when `preserve_sort order` is false + write!(f, ", maintains_sort_order=true")?; } if let Some(sort_exprs) = self.sort_exprs() { @@ -760,9 +767,6 @@ impl DisplayAs for RepartitionExec { } DisplayFormatType::TreeRender => { writeln!(f, "partitioning_scheme={}", self.partitioning(),)?; - - let input_partition_count = - self.input.output_partitioning().partition_count(); let output_partition_count = self.partitioning().partition_count(); let input_to_output_partition_str = format!("{input_partition_count} -> {output_partition_count}"); @@ -2431,7 +2435,7 @@ mod test { // Repartition should not preserve order assert_plan!(&exec, @r" - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC "); @@ -2668,7 +2672,7 @@ mod test { // Repartition should not preserve order assert_plan!(exec.as_ref(), @r" - RepartitionExec: partitioning=RoundRobinBatch(20), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(20), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC "); Ok(()) diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 9aeaaacb1071..088c61047ea8 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -49,7 +49,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true @@ -69,7 +69,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT @@ -88,7 +88,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index e81bfb72a0ef..83faae0db595 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7170,7 +7170,7 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], file_type=csv, has_header=true # test last to first @@ -7184,7 +7184,7 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], file_type=csv, has_header=true # test building plan with aggreagte sum diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt index be79f1423859..ec069212f558 100644 --- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -39,7 +39,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # constant ticker, CAST(time AS DATE) = time, order by time @@ -55,7 +55,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by date @@ -71,7 +71,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by ticker @@ -87,7 +87,7 @@ logical_plan physical_plan 01)CoalescePartitionsExec 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by time, date @@ -103,7 +103,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST, date@0 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # CAST(time AS DATE) <> date (should require a sort) @@ -143,5 +143,5 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--FilterExec: date@0 = 2006-01-02 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b74815edaa57..ddf081689e9e 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3870,7 +3870,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], first_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]] -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II rowsort @@ -3982,7 +3982,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true # drop table multiple_ordered_table_with_pk @@ -4023,7 +4023,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true statement ok @@ -4297,7 +4297,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=2 05)--------RepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0], 8), input_partitions=8, preserve_order=true, sort_exprs=date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 DESC 06)----------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 08)--------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] query P @@ -4352,7 +4352,7 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=2 06)----------RepartitionExec: partitioning=Hash([date_part(Utf8("MONTH"),csv_with_timestamps.ts)@0], 8), input_partitions=8 07)------------AggregateExec: mode=Partial, gby=[date_part(MONTH, ts@0) as date_part(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 DESC], file_type=csv, has_header=false query I @@ -4392,7 +4392,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [name@0 DESC, time_chunks@1 DESC], fetch=5 02)--ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] -03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] statement ok diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index e78169734fe5..65e8d3189f00 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -58,7 +58,7 @@ physical_plan 02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5 03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], file_type=csv, has_header=true -05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true # preserve_inner_join @@ -100,7 +100,7 @@ physical_plan 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true 06)--------FilterExec: d@3 = 3 -07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true # preserve_right_semi_join diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 4a243258a519..f217ba1bd5a0 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3189,12 +3189,12 @@ physical_plan 01)SortPreservingMergeExec: [rn1@5 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 08)----CoalesceBatchesExec: target_batch_size=2 -09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 10)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # sort merge join should propagate ordering equivalence of the right side @@ -3221,10 +3221,10 @@ physical_plan 01)SortPreservingMergeExec: [rn1@10 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Right, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 06)----CoalesceBatchesExec: target_batch_size=2 -07)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +07)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 08)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 09)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true @@ -3260,12 +3260,12 @@ physical_plan 01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 08)----CoalesceBatchesExec: target_batch_size=2 -09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 10)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 11)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 12)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true @@ -3463,10 +3463,10 @@ physical_plan 07)------------CoalesceBatchesExec: target_batch_size=2 08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] 09)----------------CoalesceBatchesExec: target_batch_size=2 -10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1 +10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true 11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true 12)----------------CoalesceBatchesExec: target_batch_size=2 -13)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1 +13)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true 14)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -3483,7 +3483,7 @@ logical_plan physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=a@1 < a@0 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true -03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # Currently datafusion can pushdown filter conditions with scalar UDF into @@ -3504,7 +3504,7 @@ physical_plan 02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_1] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 04)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] -05)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true #### diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 48781e46f11c..a2b6eae7d966 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -664,7 +664,7 @@ physical_plan 03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] 04)------UnionExec 05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true 08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] 09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 9c806cfa0d8a..d94d48d45af9 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -46,7 +46,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a_big@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -62,7 +62,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # Cast to larger types as well as preserving ordering @@ -83,7 +83,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a_big@1 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # test for common rename @@ -135,7 +135,7 @@ physical_plan 01)SortPreservingMergeExec: [a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--SortExec: expr=[a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(a@0 AS Utf8View) as a_str, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # We cannot determine a+b is ordered from the @@ -170,5 +170,5 @@ physical_plan 01)SortPreservingMergeExec: [sum_expr@0 ASC NULLS LAST] 02)--SortExec: expr=[sum_expr@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(a@0 + b@1 AS Int64) as sum_expr, a@0 as a, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index b1d02f6dc16e..8f0cb5e53d76 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -561,7 +561,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [result@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[b@1 + a@0 + c@2 as result] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST], [b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true statement ok @@ -592,7 +592,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false query TT @@ -607,7 +607,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false statement ok @@ -650,7 +650,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[atan(c11@0) as atan_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -665,7 +665,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[ceil(c11@0) as ceil_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -680,7 +680,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[log(c12@1, c11@0) as log_c11_base_c12] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true query TT @@ -695,7 +695,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST] 02)--ProjectionExec: expr=[log(c11@0, c12@1) as log_c12_base_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true statement ok @@ -1143,7 +1143,7 @@ physical_plan 01)SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(c@0 AS Utf8View) as c_str] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true @@ -1173,7 +1173,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok @@ -1209,7 +1209,7 @@ physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[abs(c@0) as abs_c] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok @@ -1243,7 +1243,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[abs(c@0) as abs_c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true # Boolean to integer casts preserve the order. @@ -1269,7 +1269,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST], [desc_col@1 DESC]], file_type=csv, has_header=true # Union a query with the actual data and one with a constant @@ -1292,7 +1292,7 @@ logical_plan 03)----TableScan: ordered_table projection=[a, b] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true @@ -1331,7 +1331,7 @@ logical_plan 03)----TableScan: ordered_table projection=[a, b] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/qualify.slt b/datafusion/sqllogictest/test_files/qualify.slt index d4de5f9a9a61..8ced344e0f2a 100644 --- a/datafusion/sqllogictest/test_files/qualify.slt +++ b/datafusion/sqllogictest/test_files/qualify.slt @@ -351,7 +351,7 @@ physical_plan 02)--SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[dept@0 as dept, sum(users.salary)@1 as s] 04)------FilterExec: rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 = 1, projection=[dept@0, sum(users.salary)@1] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)----------BoundedWindowAggExec: wdw=[rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 07)------------SortPreservingMergeExec: [sum(users.salary)@1 DESC] 08)--------------SortExec: expr=[sum(users.salary)@1 DESC], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 21878d36db02..30dbcc978c9b 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1404,7 +1404,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # since query below doesn't computation @@ -1441,7 +1441,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1461,7 +1461,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1481,7 +1481,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1501,7 +1501,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1522,7 +1522,7 @@ physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--SortExec: expr=[c@3 ASC NULLS LAST], preserve_partitioning=[true] 03)----FilterExec: a@1 = 0 OR b@2 = 0 -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # When ordering lost during projection, we shouldn't keep the SortExec. @@ -1569,7 +1569,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [CAST(round(CAST(b@2 AS Float64)) AS Int32) ASC NULLS LAST] 02)--FilterExec: CAST(round(CAST(b@2 AS Float64)) AS Int32) = a@1 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index aa87026c5cf3..5f9276bdb78e 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -939,4 +939,4 @@ SELECT t2.a, t2.b, t2.c FROM t2 WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) ---- -4 101 1001 \ No newline at end of file +4 101 1001 diff --git a/datafusion/sqllogictest/test_files/spark/math/csc.slt b/datafusion/sqllogictest/test_files/spark/math/csc.slt index 5eb9f4447280..837704113da4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/csc.slt +++ b/datafusion/sqllogictest/test_files/spark/math/csc.slt @@ -43,4 +43,4 @@ SELECT csc(a) FROM (VALUES (pi()), (-pi()), (pi()/2) , (arrow_cast('NAN','Float3 8165619676597685 -8165619676597685 1 -NaN \ No newline at end of file +NaN diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a08cc17d417..6c4d7be5ab8a 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -383,7 +383,7 @@ physical_plan 02)--SortExec: TopK(fetch=3), expr=[number_plus@0 DESC, number@1 DESC, age@3 ASC NULLS LAST], preserve_partitioning=[true], sort_prefix=[number_plus@0 DESC, number@1 DESC] 03)----ProjectionExec: expr=[__common_expr_1@0 as number_plus, number@1 as number, __common_expr_1@0 as other_number_plus, age@2 as age] 04)------ProjectionExec: expr=[CAST(number@0 AS Int64) + 1 as __common_expr_1, number@0 as number, age@1 as age] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, age], output_ordering=[number@0 DESC], file_type=parquet # Cleanup diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index e650116a69b5..c4319c665bd0 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -968,7 +968,7 @@ physical_plan 07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] 08)--------------UnnestExec 09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] -10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2e228472d68c..b7ef74e6c167 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1927,7 +1927,7 @@ physical_plan 03)----ProjectionExec: expr=[c3@0 as c3, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2] 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 05)--------CoalesceBatchesExec: target_batch_size=4096 -06)----------RepartitionExec: partitioning=Hash([c3@0], 2), input_partitions=1 +06)----------RepartitionExec: partitioning=Hash([c3@0], 2), input_partitions=1, maintains_sort_order=true 07)------------ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 08)--------------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 09)----------------SortExec: expr=[c3@1 DESC, c9@2 DESC, c2@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -2144,7 +2144,7 @@ physical_plan 02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(aggregate_test_100.c13)] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[array_agg(aggregate_test_100.c13)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 06)----------SortExec: TopK(fetch=1), expr=[c13@0 ASC NULLS LAST], preserve_partitioning=[false] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], file_type=csv, has_header=true @@ -3371,7 +3371,7 @@ physical_plan 13)------------------------CoalesceBatchesExec: target_batch_size=4096 14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST 15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d] -16)------------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +16)------------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 17)--------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] # reset the partition number 1 again @@ -3716,7 +3716,7 @@ physical_plan 02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW@5 as avg_d] 03)----BoundedWindowAggExec: wdw=[avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Field { "avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW": nullable Float64 }, frame: RANGE BETWEEN 10 PRECEDING AND CURRENT ROW], mode=[Linear] 04)------CoalesceBatchesExec: target_batch_size=4096 -05)--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=1, maintains_sort_order=true 06)----------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]] # CTAS with NTILE function @@ -5576,7 +5576,7 @@ physical_plan 02)--ProjectionExec: expr=[c1@0 as c1, sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as sum_c9] 03)----WindowAggExec: wdw=[sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 04)------CoalesceBatchesExec: target_batch_size=1 -05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -5606,7 +5606,7 @@ physical_plan 02)--ProjectionExec: expr=[c1@0 as c1, min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as min_c5] 03)----WindowAggExec: wdw=[min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 04)------CoalesceBatchesExec: target_batch_size=1 -05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c5], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true query TT