From 05f9d0e205dad26b3fd24f62d225fe6e3d6e6982 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 25 Oct 2023 16:25:30 -0400 Subject: [PATCH] Minor: reduce use of cfg(parquet) in tests --- .../enforce_distribution.rs | 56 +------------------ 1 file changed, 3 insertions(+), 53 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index d3fbc46a6659..5ba48e134fa9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1674,6 +1674,8 @@ impl TreeNode for PlanWithKeyRequirements { } } +/// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on +#[cfg(feature = "parquet")] #[cfg(test)] mod tests { use std::ops::Deref; @@ -1683,7 +1685,6 @@ mod tests { use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::FileScanConfig; - #[cfg(feature = "parquet")] use crate::datasource::physical_plan::ParquetExec; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::output_requirements::OutputRequirements; @@ -1823,12 +1824,10 @@ mod tests { ])) } - #[cfg(feature = "parquet")] fn parquet_exec() -> Arc { parquet_exec_with_sort(vec![]) } - #[cfg(feature = "parquet")] fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { @@ -1849,13 +1848,11 @@ mod tests { )) } - #[cfg(feature = "parquet")] fn parquet_exec_multiple() -> Arc { parquet_exec_multiple_sorted(vec![]) } // Created a sorted parquet exec with multiple files - #[cfg(feature = "parquet")] fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { @@ -2210,7 +2207,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_hash_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2373,7 +2369,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_joins_after_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2453,7 +2448,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_joins_after_multi_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2509,7 +2503,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn join_after_agg_alias() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -2549,7 +2542,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn hash_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -2602,7 +2594,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_hash_join_key_ordering() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2719,7 +2710,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn reorder_join_keys_to_left_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2850,7 +2840,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn reorder_join_keys_to_right_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2976,7 +2965,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_smj_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -3250,7 +3238,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn smj_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -3346,7 +3333,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn merge_does_not_need_sort() -> Result<()> { // see https://github.com/apache/arrow-datafusion/issues/4331 let schema = schema(); @@ -3387,7 +3373,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn union_to_interleave() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -3429,7 +3414,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn added_repartition_to_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(parquet_exec(), alias); @@ -3448,7 +3432,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_deepest_node() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(filter_exec(parquet_exec()), alias); @@ -3468,7 +3451,7 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] + fn repartition_unsorted_limit() -> Result<()> { let plan = limit_exec(filter_exec(parquet_exec())); @@ -3488,7 +3471,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3511,7 +3493,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_sorted_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3537,7 +3518,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias( @@ -3568,7 +3548,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_union() -> Result<()> { let plan = union_exec(vec![parquet_exec(); 5]); @@ -3588,7 +3567,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input let schema = schema(); @@ -3611,7 +3589,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge() -> Result<()> { // sort preserving merge already sorted input, let schema = schema(); @@ -3643,7 +3620,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) let schema = schema(); @@ -3676,7 +3652,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort() -> Result<()> { // SortRequired // Parquet(sorted) @@ -3702,7 +3677,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { // model a more complicated scenario where one child of a union can be repartitioned for performance // but the other can not be @@ -3741,7 +3715,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_with_projection() -> Result<()> { let schema = schema(); let proj_exprs = vec![( @@ -3784,7 +3757,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_transitively_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3815,7 +3787,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3845,7 +3816,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3920,7 +3890,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone()); @@ -4009,7 +3978,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_two_partitions() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -4037,7 +4005,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_two_partitions_into_four() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -4065,7 +4032,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4098,7 +4064,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4144,7 +4109,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias( @@ -4195,7 +4159,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_union_inputs() -> Result<()> { let plan_parquet = union_exec(vec![parquet_exec(); 5]); let plan_csv = union_exec(vec![csv_exec(); 5]); @@ -4225,7 +4188,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4256,7 +4218,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_sort_preserving_merge_with_union() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4291,7 +4252,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_does_not_benefit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4320,7 +4280,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> { // sorted input let schema = schema(); @@ -4401,7 +4360,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn remove_redundant_roundrobins() -> Result<()> { let input = parquet_exec(); let repartition = repartition_exec(repartition_exec(input)); @@ -4452,7 +4410,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4485,7 +4442,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition2() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4524,7 +4480,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition3() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4547,7 +4502,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_put_sort_when_input_is_invalid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4586,7 +4540,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn put_sort_when_input_is_valid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4629,7 +4582,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_add_unnecessary_hash() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4685,7 +4637,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition() -> Result<()> { let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec())); let expected = &[ @@ -4705,7 +4656,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition2() -> Result<()> { let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec( filter_exec(repartition_exec(parquet_exec())),