diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4aedc3b0d1a9..a34958a6c96d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -3787,7 +3787,7 @@ pub(crate) mod tests { fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { - expr: col("c", &schema).unwrap(), + expr: col("a", &schema).unwrap(), options: SortOptions::default(), }]; let plan = sort_exec( @@ -3804,9 +3804,9 @@ pub(crate) mod tests { ); let expected = &[ - "SortPreservingMergeExec: [c@2 ASC]", + "SortPreservingMergeExec: [a@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec: expr=[c@2 ASC]", + "SortExec: expr=[a@0 ASC]", "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "FilterExec: c@2 = 0", // repartition is lowest down @@ -3817,7 +3817,7 @@ pub(crate) mod tests { assert_optimized!(expected, plan.clone(), true); let expected_first_sort_enforcement = &[ - "SortExec: expr=[c@2 ASC]", + "SortExec: expr=[a@0 ASC]", "CoalescePartitionsExec", "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "FilterExec: c@2 = 0", diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 7f8c9b852cb1..6ecf624228ca 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -628,7 +628,8 @@ mod tests { let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false); - let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]", + let expected_input = [ + "SortExec: expr=[a@0 ASC NULLS LAST]", " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", @@ -766,7 +767,8 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + let expected_input = [ + "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", @@ -774,7 +776,8 @@ mod tests { let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"]; + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true" + ]; assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index f3bfe4961622..fc32094019e3 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::sync::Arc; @@ -28,7 +28,7 @@ use crate::{ }; use arrow::datatypes::SchemaRef; -use arrow_schema::SortOptions; +use arrow_schema::{Field, Schema, SortOptions}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{JoinSide, JoinType, Result}; @@ -138,66 +138,170 @@ impl EquivalenceClass { } /// Stores the mapping between source expressions and target expressions for a -/// projection. +/// projection, and the output (post-projection) schema of the table. #[derive(Debug, Clone)] pub struct ProjectionMapping { - /// `(source expression)` --> `(target expression)` - /// Indices in the vector corresponds to the indices after projection. - inner: Vec<(Arc, Arc)>, + /// Mapping between source expressions and target expressions. + /// Vector indices correspond to the indices after projection. + map: Vec<(Arc, Arc)>, + /// Output (post-projection) schema. + output_schema: SchemaRef, } impl ProjectionMapping { /// Constructs the mapping between a projection's input and output /// expressions. /// - /// For example, given the input projection expressions (`a+b`, `c+d`) - /// and an output schema with two columns `"c+d"` and `"a+b"` - /// the projection mapping would be + /// For example, given the input projection expressions (`a + b`, `c + d`) + /// and an output schema with two columns `"c + d"` and `"a + b"`, the + /// projection mapping would be: + /// /// ```text - /// [0]: (c+d, col("c+d")) - /// [1]: (a+b, col("a+b")) + /// [0]: (c + d, col("c + d")) + /// [1]: (a + b, col("a + b")) /// ``` - /// where `col("c+d")` means the column named "c+d". + /// + /// where `col("c + d")` means the column named `"c + d"`. pub fn try_new( expr: &[(Arc, String)], input_schema: &SchemaRef, ) -> Result { // Construct a map from the input expressions to the output expression of the projection: - let mut inner = vec![]; - for (expr_idx, (expression, name)) in expr.iter().enumerate() { - let target_expr = Arc::new(Column::new(name, expr_idx)) as _; - - let source_expr = expression.clone().transform_down(&|e| match e - .as_any() - .downcast_ref::( - ) { - Some(col) => { - // Sometimes, expression and its name in the input_schema doesn't match. - // This can cause problems. Hence in here we make sure that expression name - // matches with the name in the inout_schema. - // Conceptually, source_expr and expression should be same. - let idx = col.index(); - let matching_input_field = input_schema.field(idx); - let matching_input_column = - Column::new(matching_input_field.name(), idx); - Ok(Transformed::Yes(Arc::new(matching_input_column))) - } - None => Ok(Transformed::No(e)), - })?; + let map = expr + .iter() + .enumerate() + .map(|(expr_idx, (expression, name))| { + let target_expr = Arc::new(Column::new(name, expr_idx)) as _; + expression + .clone() + .transform_down(&|e| match e.as_any().downcast_ref::() { + Some(col) => { + // Sometimes, an expression and its name in the input_schema + // doesn't match. This can cause problems, so we make sure + // that the expression name matches with the name in `input_schema`. + // Conceptually, `source_expr` and `expression` should be the same. + let idx = col.index(); + let matching_input_field = input_schema.field(idx); + let matching_input_column = + Column::new(matching_input_field.name(), idx); + Ok(Transformed::Yes(Arc::new(matching_input_column))) + } + None => Ok(Transformed::No(e)), + }) + .map(|source_expr| (source_expr, target_expr)) + }) + .collect::>>()?; - inner.push((source_expr, target_expr)); - } - Ok(Self { inner }) + // Calculate output schema: + let fields = expr + .iter() + .map(|(e, name)| { + Ok(Field::new( + name, + e.data_type(input_schema)?, + e.nullable(input_schema)?, + ) + .with_metadata(get_field_metadata(e, input_schema).unwrap_or_default())) + }) + .collect::>>()?; + + let output_schema = Arc::new(Schema::new_with_metadata( + fields, + input_schema.metadata().clone(), + )); + + Ok(Self { map, output_schema }) } /// Iterate over pairs of (source, target) expressions pub fn iter( &self, ) -> impl Iterator, Arc)> + '_ { - self.inner.iter() + self.map.iter() + } + + /// Returns a reference to the output (post-projection) schema. + pub fn output_schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + /// This function returns the target expression for a given source expression. + /// + /// # Arguments + /// + /// * `expr` - Source physical expression. + /// + /// # Returns + /// + /// An `Option` containing the target expression for the given source. + /// `None` means that source is not found inside the mapping. + pub fn target_expr( + &self, + expr: &Arc, + ) -> Option> { + self.map + .iter() + .find(|(source, _)| source.eq(expr)) + .map(|(_, target)| target.clone()) + } + + /// This function returns the target expressions for all given source + /// expressions. + /// + /// # Arguments + /// + /// * `exprs` - Source physical expressions. + /// + /// # Returns + /// + /// An `Option` containing the target expressions for all the sources. + /// If any of the given sources is absent in the mapping, returns `None`. + pub fn target_exprs( + &self, + exprs: &[Arc], + ) -> Option>> { + exprs.iter().map(|expr| self.target_expr(expr)).collect() + } + + /// This function projects the given ordering requirement according to this + /// mapping. + /// + /// # Arguments + /// + /// * `lex_req` - Lexicographical ordering requirement. + /// + /// # Returns + /// + /// An `Option` containing the projected lexicographical ordering requirement. + /// If any of the given requirements is absent in the mapping, returns `None`. + pub fn project_lex_reqs(&self, lex_req: LexRequirementRef) -> Option { + lex_req + .iter() + .map(|sort_req| { + self.target_expr(&sort_req.expr) + .map(|expr| PhysicalSortRequirement { + expr, + options: sort_req.options, + }) + }) + .collect() } } +/// If `e` refers to a column, returns its field level metadata, if any. +/// Otherwise, returns `None`. +fn get_field_metadata( + e: &Arc, + input_schema: &Schema, +) -> Option> { + e.as_any().downcast_ref::().and_then(|column| { + input_schema + .field_with_name(column.name()) + .ok() + .map(|f| f.metadata().clone()) + }) +} + /// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each /// class represents a distinct equivalence class in a relation. #[derive(Debug, Clone)] @@ -413,32 +517,38 @@ impl EquivalenceGroup { mapping: &ProjectionMapping, expr: &Arc, ) -> Option> { - let children = expr.children(); - if children.is_empty() { + // First, we try to project expressions with an exact match. If we are + // unable to do this, we consult equivalence classes. + if let Some(target) = mapping.target_expr(expr) { + // If we match the source, we can project directly: + return Some(target); + } else { + // If the given expression is not inside the mapping, try to project + // expressions considering the equivalence classes. for (source, target) in mapping.iter() { - // If we match the source, or an equivalent expression to source, - // then we can project. For example, if we have the mapping - // (a as a1, a + c) and the equivalence class (a, b), expression - // b also projects to a1. - if source.eq(expr) - || self - .get_equivalence_class(source) - .map_or(false, |group| group.contains(expr)) + // If we match an equivalent expression to `source`, then we + // can project. For example, if we have the mapping `a -> a1` + // and the equivalence `a = b`, expression `b` projects to `a1`. + if self + .get_equivalence_class(source) + .map_or(false, |group| group.contains(expr)) { return Some(target.clone()); } } } // Project a non-leaf expression by projecting its children. - else if let Some(children) = children - .into_iter() - .map(|child| self.project_expr(mapping, &child)) - .collect::>>() - { - return Some(expr.clone().with_new_children(children).unwrap()); + let children = expr.children(); + if children.is_empty() { + // A leaf expression should be inside the given mapping. + None + } else { + children + .into_iter() + .map(|child| self.project_expr(mapping, &child)) + .collect::>>() + .map(|p| expr.clone().with_new_children(p).unwrap()) } - // Arriving here implies the expression was invalid after projection. - None } /// Projects `ordering` according to the given projection mapping. @@ -662,7 +772,7 @@ impl OrderingEquivalenceClass { fn remove_redundant_entries(&mut self) { let mut idx = 0; while idx < self.orderings.len() { - let mut removal = false; + let mut removal = self.orderings[idx].is_empty(); for (ordering_idx, ordering) in self.orderings[0..idx].iter().enumerate() { if let Some(right_finer) = finer_side(ordering, &self.orderings[idx]) { if right_finer { @@ -982,39 +1092,48 @@ impl EquivalenceProperties { self.ordering_satisfy_requirement(&sort_requirements) } - /// Checks whether the given sort requirements are satisfied by any of the - /// existing orderings. + /// Checks whether the given sort requirements are satisfied by any of the existing orderings. + /// This function applies an implicit projection to itself before calling `ordering_satisfy_requirement_helper` + /// to define the orderings of complex [`PhysicalExpr`]'s during analysis. pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool { - // First, standardize the given requirement: - let normalized_reqs = self.normalize_sort_requirements(reqs); - if normalized_reqs.is_empty() { - // Requirements are tautologically satisfied if empty. - return true; - } - let mut indices = HashSet::new(); - for ordering in self.normalized_oeq_class().iter() { - let match_indices = ordering - .iter() - .map(|sort_expr| { - normalized_reqs - .iter() - .position(|sort_req| sort_expr.satisfy(sort_req, &self.schema)) - }) - .collect::>(); - // Find the largest contiguous increasing sequence starting from the first index: - if let Some(&Some(first)) = match_indices.first() { - indices.insert(first); - let mut iter = match_indices.windows(2); - while let Some([Some(current), Some(next)]) = iter.next() { - if next > current { - indices.insert(*next); - } else { - break; + let exprs = reqs + .iter() + .map(|sort_req| sort_req.expr.clone()) + .collect::>(); + let mapping = self.implicit_projection_mapping(&exprs); + let projected_eqs = self.project(&mapping, mapping.output_schema()); + mapping.project_lex_reqs(reqs).map_or(false, |reqs| { + // First, standardize the given requirement: + let normalized_reqs = projected_eqs.normalize_sort_requirements(&reqs); + if normalized_reqs.is_empty() { + // Requirements are tautologically satisfied if empty. + return true; + } + let mut indices = HashSet::new(); + for ordering in projected_eqs.normalized_oeq_class().iter() { + let match_indices = ordering + .iter() + .map(|sort_expr| { + normalized_reqs.iter().position(|sort_req| { + sort_expr.satisfy(sort_req, &projected_eqs.schema) + }) + }) + .collect::>(); + // Find the largest contiguous increasing sequence starting from the first index: + if let Some(&Some(first)) = match_indices.first() { + indices.insert(first); + let mut iter = match_indices.windows(2); + while let Some([Some(current), Some(next)]) = iter.next() { + if next > current { + indices.insert(*next); + } else { + break; + } } } } - } - indices.len() == normalized_reqs.len() + indices.len() == normalized_reqs.len() + }) } /// Checks whether the `given`` sort requirements are equal or more specific @@ -1117,6 +1236,32 @@ impl EquivalenceProperties { (!meet.is_empty()).then_some(meet) } + /// Creates a projection mapping to support complex expressions (e.g. a + b, + /// DATE_BIN(c), ...). Using this implicit projection, we can determine the + /// ordering properties of complex expressions without actually evaluating them. + fn implicit_projection_mapping( + &self, + exprs: &[Arc], + ) -> ProjectionMapping { + // First project existing fields as is, then project complex expressions: + let proj_exprs = self + .schema + .fields + .iter() + .enumerate() + .map(|(idx, field)| { + let name = field.name(); + (Arc::new(Column::new(name, idx)) as _, name.to_string()) + }) + .chain(exprs.iter().flat_map(|expr| { + // Do not project column expressions: + let is_column = expr.as_any().is::(); + (!is_column).then(|| (expr.clone(), expr.to_string())) + })) + .collect::>(); + ProjectionMapping::try_new(&proj_exprs, self.schema()).unwrap() + } + /// Projects argument `expr` according to `projection_mapping`, taking /// equivalences into account. /// @@ -1162,10 +1307,15 @@ impl EquivalenceProperties { }]); } } + let target_constants = self + .constants + .iter() + .flat_map(|constant| projection_mapping.target_expr(constant)) + .collect(); Self { eq_group: self.eq_group.project(projection_mapping), oeq_class: OrderingEquivalenceClass::new(projected_orderings), - constants: vec![], + constants: target_constants, schema: output_schema, } } @@ -1180,14 +1330,21 @@ impl EquivalenceProperties { /// definition of "partial permutation", see: /// /// + /// + /// This function applies an implicit projection to itself before calling `find_longest_permutation_helper`. + /// This enables us to consider complex expressions during analysis. pub fn find_longest_permutation( &self, exprs: &[Arc], ) -> (LexOrdering, Vec) { - let normalized_exprs = self.eq_group.normalize_exprs(exprs.to_vec()); + let projection_mapping = self.implicit_projection_mapping(exprs); + let projected = + self.project(&projection_mapping, projection_mapping.output_schema()); + let exprs = projection_mapping.target_exprs(exprs).unwrap_or_default(); + let normalized_exprs = projected.eq_group.normalize_exprs(exprs.clone()); // Use a map to associate expression indices with sort options: let mut ordered_exprs = IndexMap::::new(); - for ordering in self.normalized_oeq_class().iter() { + for ordering in projected.normalized_oeq_class().iter() { for sort_expr in ordering { if let Some(idx) = normalized_exprs .iter() @@ -1594,27 +1751,21 @@ mod tests { let input_properties = EquivalenceProperties::new(input_schema.clone()); let col_a = col("a", &input_schema)?; - - let out_schema = Arc::new(Schema::new(vec![ - Field::new("a1", DataType::Int64, true), - Field::new("a2", DataType::Int64, true), - Field::new("a3", DataType::Int64, true), - Field::new("a4", DataType::Int64, true), - ])); + // a as a1, a as a2, a as a3, a as a3 + let proj_exprs = vec![ + (col_a.clone(), "a1".to_string()), + (col_a.clone(), "a2".to_string()), + (col_a.clone(), "a3".to_string()), + (col_a.clone(), "a4".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + let out_schema = projection_mapping.output_schema(); // a as a1, a as a2, a as a3, a as a3 let col_a1 = &col("a1", &out_schema)?; let col_a2 = &col("a2", &out_schema)?; let col_a3 = &col("a3", &out_schema)?; let col_a4 = &col("a4", &out_schema)?; - let projection_mapping = ProjectionMapping { - inner: vec![ - (col_a.clone(), col_a1.clone()), - (col_a.clone(), col_a2.clone()), - (col_a.clone(), col_a3.clone()), - (col_a.clone(), col_a4.clone()), - ], - }; let out_properties = input_properties.project(&projection_mapping, out_schema); // At the output a1=a2=a3=a4 @@ -1631,6 +1782,10 @@ mod tests { #[test] fn test_ordering_satisfy() -> Result<()> { + let input_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ])); let crude = vec![PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -1646,13 +1801,12 @@ mod tests { }, ]; // finer ordering satisfies, crude ordering should return true - let empty_schema = &Arc::new(Schema::empty()); - let mut eq_properties_finer = EquivalenceProperties::new(empty_schema.clone()); + let mut eq_properties_finer = EquivalenceProperties::new(input_schema.clone()); eq_properties_finer.oeq_class.push(finer.clone()); assert!(eq_properties_finer.ordering_satisfy(&crude)); // Crude ordering doesn't satisfy finer ordering. should return false - let mut eq_properties_crude = EquivalenceProperties::new(empty_schema.clone()); + let mut eq_properties_crude = EquivalenceProperties::new(input_schema.clone()); eq_properties_crude.oeq_class.push(crude.clone()); assert!(!eq_properties_crude.ordering_satisfy(&finer)); Ok(()) @@ -2111,6 +2265,13 @@ mod tests { ], ], ), + // ------- TEST CASE 5 --------- + // Empty ordering + ( + vec![vec![]], + // No ordering in the state (empty ordering is ignored). + vec![], + ), ]; for (orderings, expected) in test_cases { let orderings = convert_to_orderings(&orderings); @@ -2922,35 +3083,26 @@ mod tests { #[test] fn project_empty_output_ordering() -> Result<()> { - let schema = Schema::new(vec![ + let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, true), Field::new("c", DataType::Int32, true), - ]); - let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); + ])); + let mut eq_properties = EquivalenceProperties::new(schema.clone()); let ordering = vec![PhysicalSortExpr { - expr: Arc::new(Column::new("b", 1)), + expr: col("b", &schema)?, options: SortOptions::default(), }]; eq_properties.add_new_orderings([ordering]); - let projection_mapping = ProjectionMapping { - inner: vec![ - ( - Arc::new(Column::new("b", 1)) as _, - Arc::new(Column::new("b_new", 0)) as _, - ), - ( - Arc::new(Column::new("a", 0)) as _, - Arc::new(Column::new("a_new", 1)) as _, - ), - ], - }; - let projection_schema = Arc::new(Schema::new(vec![ - Field::new("b_new", DataType::Int32, true), - Field::new("a_new", DataType::Int32, true), - ])); + // b as b_new, a as a_new + let proj_exprs = vec![ + (col("b", &schema)?, "b_new".to_string()), + (col("a", &schema)?, "a_new".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?; + let orderings = eq_properties - .project(&projection_mapping, projection_schema) + .project(&projection_mapping, projection_mapping.output_schema()) .oeq_class() .output_ordering() .unwrap_or_default(); @@ -2963,29 +3115,20 @@ mod tests { orderings ); - let schema = Schema::new(vec![ + let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, true), Field::new("c", DataType::Int32, true), - ]); - let eq_properties = EquivalenceProperties::new(Arc::new(schema)); - let projection_mapping = ProjectionMapping { - inner: vec![ - ( - Arc::new(Column::new("c", 2)) as _, - Arc::new(Column::new("c_new", 0)) as _, - ), - ( - Arc::new(Column::new("b", 1)) as _, - Arc::new(Column::new("b_new", 1)) as _, - ), - ], - }; - let projection_schema = Arc::new(Schema::new(vec![ - Field::new("c_new", DataType::Int32, true), - Field::new("b_new", DataType::Int32, true), ])); - let projected = eq_properties.project(&projection_mapping, projection_schema); + let eq_properties = EquivalenceProperties::new(schema.clone()); + // c as c_new, b as b_new + let proj_exprs = vec![ + (col("c", &schema)?, "c_new".to_string()), + (col("b", &schema)?, "b_new".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &schema)?; + let projected = eq_properties + .project(&projection_mapping, projection_mapping.output_schema()); // After projection there is no ordering. assert!(projected.oeq_class().output_ordering().is_none()); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index b8e2d0e425d4..9ce49dfe9f95 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -21,7 +21,6 @@ //! projection expressions. `SELECT` without `FROM` will only evaluate expressions. use std::any::Any; -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -33,7 +32,7 @@ use crate::{ ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, }; -use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::datatypes::SchemaRef; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::stats::Precision; use datafusion_common::Result; @@ -71,37 +70,17 @@ impl ProjectionExec { ) -> Result { let input_schema = input.schema(); - let fields: Result> = expr - .iter() - .map(|(e, name)| { - let mut field = Field::new( - name, - e.data_type(&input_schema)?, - e.nullable(&input_schema)?, - ); - field.set_metadata( - get_field_metadata(e, &input_schema).unwrap_or_default(), - ); - - Ok(field) - }) - .collect(); - - let schema = Arc::new(Schema::new_with_metadata( - fields?, - input_schema.metadata().clone(), - )); - // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; let input_eqs = input.equivalence_properties(); - let project_eqs = input_eqs.project(&projection_mapping, schema.clone()); + let project_eqs = + input_eqs.project(&projection_mapping, projection_mapping.output_schema()); let output_ordering = project_eqs.oeq_class().output_ordering(); Ok(Self { expr, - schema, + schema: projection_mapping.output_schema(), input, output_ordering, projection_mapping, @@ -251,24 +230,6 @@ impl ExecutionPlan for ProjectionExec { } } -/// If e is a direct column reference, returns the field level -/// metadata for that field, if any. Otherwise returns None -fn get_field_metadata( - e: &Arc, - input_schema: &Schema, -) -> Option> { - let name = if let Some(column) = e.as_any().downcast_ref::() { - column.name() - } else { - return None; - }; - - input_schema - .field_with_name(name) - .ok() - .map(|f| f.metadata().clone()) -} - fn stats_projection( mut stats: Statistics, exprs: impl Iterator>, @@ -370,7 +331,7 @@ mod tests { use crate::expressions; use crate::test; - use arrow_schema::DataType; + use arrow_schema::{DataType, Field, Schema}; use datafusion_common::ScalarValue; #[tokio::test] diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 4438d69af306..683c330c1461 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3842,6 +3842,51 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX( --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] ----------------------MemoryExec: partitions=1, partition_sizes=[1] +# create an unbounded table that contains ordered timestamp. +statement ok +CREATE UNBOUNDED EXTERNAL TABLE csv_with_timestamps ( + name VARCHAR, + ts TIMESTAMP +) +STORED AS CSV +WITH ORDER (ts DESC) +LOCATION '../core/tests/data/timestamps.csv' + +# below query should work in streaming mode. +query TT +EXPLAIN SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY date_bin('15 minutes', ts) + ORDER BY time_chunks DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: time_chunks DESC NULLS FIRST, fetch=5 +----Projection: date_bin(Utf8("15 minutes"),csv_with_timestamps.ts) AS time_chunks +------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)]], aggr=[[]] +--------TableScan: csv_with_timestamps projection=[ts] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5 +----ProjectionExec: expr=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as time_chunks] +------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------CoalesceBatchesExec: target_batch_size=2 +----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8, sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC +------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] + +query P +SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY date_bin('15 minutes', ts) + ORDER BY time_chunks DESC + LIMIT 5 +---- +2018-12-13T12:00:00 +2018-11-13T17:00:00 + statement ok drop table t1