diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 12f27ab18fbd..4aedc3b0d1a9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -3794,7 +3794,11 @@ pub(crate) mod tests { sort_key, projection_exec_with_alias( filter_exec(parquet_exec()), - vec![("a".to_string(), "a".to_string())], + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ("c".to_string(), "c".to_string()), + ], ), false, ); @@ -3803,7 +3807,7 @@ pub(crate) mod tests { "SortPreservingMergeExec: [c@2 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) "SortExec: expr=[c@2 ASC]", - "ProjectionExec: expr=[a@0 as a]", + "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "FilterExec: c@2 = 0", // repartition is lowest down "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", @@ -3815,7 +3819,7 @@ pub(crate) mod tests { let expected_first_sort_enforcement = &[ "SortExec: expr=[c@2 ASC]", "CoalescePartitionsExec", - "ProjectionExec: expr=[a@0 as a]", + "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "FilterExec: c@2 = 0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 58806be6d411..7f8c9b852cb1 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -359,11 +359,11 @@ mod tests { let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -379,38 +379,41 @@ mod tests { let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let sort = sort_exec( - vec![sort_expr_default("a", &schema)], + vec![sort_expr_default("a", &coalesce_partitions.schema())], coalesce_partitions, false, ); let repartition_rr2 = repartition_exec_round_robin(sort); let repartition_hash2 = repartition_exec_hash(repartition_rr2); - let filter = filter_exec(repartition_hash2, &schema); - let sort2 = sort_exec(vec![sort_expr_default("a", &schema)], filter, true); + let filter = filter_exec(repartition_hash2); + let sort2 = + sort_exec(vec![sort_expr_default("a", &filter.schema())], filter, true); - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], sort2); + let physical_plan = sort_preserving_merge_exec( + vec![sort_expr_default("a", &sort2.schema())], + sort2, + ); let expected_input = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " SortExec: expr=[a@0 ASC]", " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@2 > 3", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC", + " FilterExec: c@1 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " SortPreservingMergeExec: [a@0 ASC]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true", ]; @@ -424,7 +427,7 @@ mod tests { let sort_exprs = vec![sort_expr("a", &schema)]; let source = csv_exec_sorted(&schema, sort_exprs, true); let repartition_rr = repartition_exec_round_robin(source); - let filter = filter_exec(repartition_rr, &schema); + let filter = filter_exec(repartition_rr); let repartition_hash = repartition_exec_hash(filter); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash, true); @@ -433,14 +436,14 @@ mod tests { let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", - " FilterExec: c@2 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", + " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@2 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", @@ -456,7 +459,7 @@ mod tests { let source = csv_exec_sorted(&schema, sort_exprs, true); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); - let filter = filter_exec(repartition_hash, &schema); + let filter = filter_exec(repartition_hash); let coalesce_batches_exec: Arc = coalesce_batches_exec(filter); let sort = sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec, true); @@ -466,14 +469,14 @@ mod tests { let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -488,7 +491,7 @@ mod tests { let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1); - let filter = filter_exec(repartition_hash, &schema); + let filter = filter_exec(repartition_hash); let coalesce_batches_exec_2 = coalesce_batches_exec(filter); let sort = sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2, true); @@ -499,15 +502,15 @@ mod tests { let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; @@ -522,7 +525,7 @@ mod tests { let source = csv_exec_sorted(&schema, sort_exprs, true); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); - let filter = filter_exec(repartition_hash, &schema); + let filter = filter_exec(repartition_hash); let coalesce_batches_exec: Arc = coalesce_batches_exec(filter); let physical_plan: Arc = @@ -530,14 +533,14 @@ mod tests { let expected_input = ["CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -551,7 +554,7 @@ mod tests { let source = csv_exec_sorted(&schema, sort_exprs, true); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); - let filter = filter_exec(repartition_hash, &schema); + let filter = filter_exec(repartition_hash); let coalesce_batches = coalesce_batches_exec(filter); let repartition_hash_2 = repartition_exec_hash(coalesce_batches); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash_2, true); @@ -562,19 +565,19 @@ mod tests { let expected_input = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true" ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@2 > 3", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " FilterExec: c@1 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; @@ -590,22 +593,24 @@ mod tests { let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let sort = sort_exec( - vec![sort_expr_default("c", &schema)], + vec![sort_expr_default("c", &repartition_hash.schema())], repartition_hash, true, ); - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], sort); + let physical_plan = sort_preserving_merge_exec( + vec![sort_expr_default("c", &sort.schema())], + sort, + ); - let expected_input = ["SortPreservingMergeExec: [c@2 ASC]", - " SortExec: expr=[c@2 ASC]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + let expected_input = ["SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; - let expected_optimized = ["SortPreservingMergeExec: [c@2 ASC]", - " SortExec: expr=[c@2 ASC]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + let expected_optimized = ["SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -625,11 +630,11 @@ mod tests { let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -645,39 +650,42 @@ mod tests { let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let sort = sort_exec( - vec![sort_expr_default("c", &schema)], + vec![sort_expr_default("c", &coalesce_partitions.schema())], coalesce_partitions, false, ); let repartition_rr2 = repartition_exec_round_robin(sort); let repartition_hash2 = repartition_exec_hash(repartition_rr2); - let filter = filter_exec(repartition_hash2, &schema); - let sort2 = sort_exec(vec![sort_expr_default("c", &schema)], filter, true); + let filter = filter_exec(repartition_hash2); + let sort2 = + sort_exec(vec![sort_expr_default("c", &filter.schema())], filter, true); - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], sort2); + let physical_plan = sort_preserving_merge_exec( + vec![sort_expr_default("c", &sort2.schema())], + sort2, + ); let expected_input = [ - "SortPreservingMergeExec: [c@2 ASC]", - " SortExec: expr=[c@2 ASC]", - " FilterExec: c@2 > 3", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + "SortPreservingMergeExec: [c@1 ASC]", + " SortExec: expr=[c@1 ASC]", + " FilterExec: c@1 > 3", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@2 ASC]", + " SortExec: expr=[c@1 ASC]", " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ - "SortPreservingMergeExec: [c@2 ASC]", - " FilterExec: c@2 > 3", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=c@2 ASC", + "SortPreservingMergeExec: [c@1 ASC]", + " FilterExec: c@1 > 3", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=c@1 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@2 ASC]", + " SortExec: expr=[c@1 ASC]", " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; @@ -705,21 +713,27 @@ mod tests { let hash_join_exec = hash_join_exec(left_coalesce_partitions, right_coalesce_partitions); - let sort = sort_exec(vec![sort_expr_default("a", &schema)], hash_join_exec, true); + let sort = sort_exec( + vec![sort_expr_default("a", &hash_join_exec.schema())], + hash_join_exec, + true, + ); - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], sort); + let physical_plan = sort_preserving_merge_exec( + vec![sort_expr_default("a", &sort.schema())], + sort, + ); let expected_input = [ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; @@ -729,11 +743,11 @@ mod tests { " SortExec: expr=[a@0 ASC]", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; @@ -754,11 +768,11 @@ mod tests { let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8", + " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", + " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; assert_optimized!(expected_input, expected_optimized, physical_plan, true); @@ -821,24 +835,23 @@ mod tests { } fn repartition_exec_hash(input: Arc) -> Arc { + let input_schema = input.schema(); Arc::new( RepartitionExec::try_new( input, - Partitioning::Hash(vec![Arc::new(Column::new("c1", 0))], 8), + Partitioning::Hash(vec![col("c", &input_schema).unwrap()], 8), ) .unwrap(), ) } - fn filter_exec( - input: Arc, - schema: &SchemaRef, - ) -> Arc { + fn filter_exec(input: Arc) -> Arc { + let input_schema = input.schema(); let predicate = expressions::binary( - col("c", schema).unwrap(), + col("c", &input_schema).unwrap(), Operator::Gt, expressions::lit(3i32), - schema, + &input_schema, ) .unwrap(); Arc::new(FilterExec::try_new(predicate, input).unwrap()) @@ -856,11 +869,15 @@ mod tests { left: Arc, right: Arc, ) -> Arc { + let left_on = col("c", &left.schema()).unwrap(); + let right_on = col("c", &right.schema()).unwrap(); + let left_col = left_on.as_any().downcast_ref::().unwrap(); + let right_col = right_on.as_any().downcast_ref::().unwrap(); Arc::new( HashJoinExec::try_new( left, right, - vec![(Column::new("c", 1), Column::new("c", 1))], + vec![(left_col.clone(), right_col.clone())], None, &JoinType::Inner, PartitionMode::Partitioned,