From edcef77667acff819bf00d13605036abc95fd32d Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 14:35:23 +0300 Subject: [PATCH 01/10] Initial commit --- .../physical-expr/src/aggregate/first_last.rs | 125 ++++- datafusion/physical-expr/src/aggregate/mod.rs | 9 +- .../physical-expr/src/aggregate/utils.rs | 10 +- .../physical-plan/src/aggregates/mod.rs | 513 +++++++----------- 4 files changed, 317 insertions(+), 340 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index c009881d8918..bfd6143695cf 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; +use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; use crate::expressions::format_state_name; use crate::{ reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, @@ -29,7 +29,6 @@ use crate::{ use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; -use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; @@ -211,10 +210,47 @@ impl FirstValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.first = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let value = &row[0]; + let orderings = &row[1..]; + // Update when + // - no entry in the state + // - There is an earlier entry in according to requirements + if !self.is_set + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_gt() + { + self.first = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) + } + + fn get_first_idx(&self, values: &[ArrayRef]) -> Result> { + let value = &values[0]; + let ordering_values = &values[1..]; + assert_eq!(ordering_values.len(), self.ordering_req.len()); + if self.ordering_req.is_empty() { + return Ok((!value.is_empty()).then_some(0)); + } + let sort_columns = ordering_values + .iter() + .zip(self.ordering_req.iter()) + .map(|(values, req)| SortColumn { + values: values.clone(), + options: Some(req.options), + }) + .collect::>(); + let indices = lexsort_to_indices(&sort_columns, Some(1))?; + if !indices.is_empty() { + return Ok(Some(indices.value(0) as usize)); + } + Ok(None) } } @@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // If we have seen first value, we shouldn't update it - if !values[0].is_empty() && !self.is_set { - let row = get_row_at_idx(values, 0)?; - // Update with first value in the array. - self.update_with_new_row(&row); + if let Some(first_idx) = self.get_first_idx(values)? { + let row = get_row_at_idx(values, first_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator { // Update with first value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&first_row[0..is_set_idx]); + self.update_with_new_row(&first_row[0..is_set_idx])?; } } Ok(()) @@ -459,10 +493,52 @@ impl LastValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.last = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let value = &row[0]; + let orderings = &row[1..]; + // Update when + // - no value in the state + // - There is no specific requirement, but a new value (most recent entry in terms of execution) + // - There is a more recent entry in terms of requirement + if !self.is_set + || self.orderings.is_empty() + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_lt() + { + self.last = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) + } + + fn get_last_idx(&self, values: &[ArrayRef]) -> Result> { + let value = &values[0]; + let ordering_values = &values[1..]; + assert_eq!(ordering_values.len(), self.ordering_req.len()); + if self.ordering_req.is_empty() { + return Ok((!value.is_empty()).then_some(value.len() - 1)); + } + let sort_columns = ordering_values + .iter() + .zip(self.ordering_req.iter()) + .map(|(values, req)| { + // Take reverse ordering requirement this would enable us to use fetch=1 for last value. + SortColumn { + values: values.clone(), + options: Some(!req.options), + } + }) + .collect::>(); + let indices = lexsort_to_indices(&sort_columns, Some(1))?; + if !indices.is_empty() { + return Ok(Some(indices.value(0) as usize)); + } + Ok(None) } } @@ -475,10 +551,9 @@ impl Accumulator for LastValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if !values[0].is_empty() { - let row = get_row_at_idx(values, values[0].len() - 1)?; - // Update with last value in the array. - self.update_with_new_row(&row); + if let Some(last_idx) = self.get_last_idx(values)? { + let row = get_row_at_idx(values, last_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -515,7 +590,7 @@ impl Accumulator for LastValueAccumulator { // Update with last value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&last_row[0..is_set_idx]); + self.update_with_new_row(&last_row[0..is_set_idx])?; } } Ok(()) @@ -559,14 +634,6 @@ fn convert_to_sort_cols( .collect::>() } -/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. -fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { - ordering_req - .iter() - .map(|item| item.options) - .collect::>() -} - #[cfg(test)] mod tests { use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator}; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 329bb1e6415e..695f08dfc1b3 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; +use crate::expressions::OrderSensitiveArrayAgg; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::Field; use datafusion_common::{not_impl_err, DataFusionError, Result}; @@ -134,10 +134,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. -/// However, a `FirstValue` depends on the input ordering (if the order changes, -/// the first value in the list would change). +/// However, a `ARRAY_AGG` with `ORDER BY` depends on the input ordering. pub fn is_order_sensitive(aggr_expr: &Arc) -> bool { - aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() - || aggr_expr.as_any().is::() + aggr_expr.as_any().is::() } diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index e5421ef5ab7e..7ba7e9d01567 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -26,7 +26,7 @@ use arrow_array::types::{ }; use arrow_array::ArrowNativeTypeOp; use arrow_buffer::ArrowNativeType; -use arrow_schema::{DataType, Field}; +use arrow_schema::{DataType, Field, SortOptions}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; use std::any::Any; @@ -205,3 +205,11 @@ pub(crate) fn ordering_fields( }) .collect() } + +/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. +pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { + ordering_req + .iter() + .map(|item| item.options) + .collect::>() +} diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index f779322456ca..e3daa1ffbc25 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -27,7 +27,7 @@ use crate::aggregates::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::windows::{get_ordered_partition_by_indices, get_window_mode}; +use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning, SendableRecordBatchStream, Statistics, @@ -38,18 +38,18 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::stats::Precision; -use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; use datafusion_physical_expr::{ aggregate::is_order_sensitive, equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, - LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, + LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use itertools::{izip, Itertools}; +use itertools::Itertools; mod group_values; mod no_grouping; @@ -101,6 +101,34 @@ impl AggregateMode { } } +/// Group By expression modes +/// +/// `PartiallyOrdered` and `FullyOrdered` are used to reason about +/// when certain group by keys will never again be seen (and thus can +/// be emitted by the grouping operator). +/// +/// Specifically, each distinct combination of the relevant columns +/// are contiguous in the input, and once a new combination is seen +/// previous combinations are guaranteed never to appear again +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GroupByOrderMode { + /// The input is known to be ordered by a preset (prefix but + /// possibly reordered) of the expressions in the `GROUP BY` clause. + /// + /// For example, if the input is ordered by `a, b, c` and we group + /// by `b, a, d`, `PartiallyOrdered` means a subset of group `b, + /// a, d` defines a preset for the existing ordering, in this case + /// `a, b`. + PartiallyOrdered, + /// The input is known to be ordered by *all* the expressions in the + /// `GROUP BY` clause. + /// + /// For example, if the input is ordered by `a, b, c, d` and we group by b, a, + /// `Ordered` means that all of the of group by expressions appear + /// as a preset for the existing ordering, in this case `a, b`. + FullyOrdered, +} + /// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET) /// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b] /// and a single group [false, false]. @@ -239,6 +267,17 @@ impl From for SendableRecordBatchStream { } } +/// A structure representing a group of aggregate expressions where each group has different +/// ordering requirements. Aggregate groups are calculated using `get_aggregate_expr_groups` function. +/// Indices refers to the position of each aggregate expressions among all aggregate expressions (prior to grouping). +#[derive(Clone, Debug, PartialEq)] +pub struct AggregateExprGroup { + /// Aggregate expressions indices + indices: Vec, + /// Requirement + requirement: LexOrdering, +} + /// Hash aggregate execution plan #[derive(Debug)] pub struct AggregateExec { @@ -277,159 +316,6 @@ pub struct AggregateExec { output_ordering: Option, } -/// This function returns the ordering requirement of the first non-reversible -/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves -/// as the initial requirement while calculating the finest requirement among all -/// aggregate functions. If this function returns `None`, it means there is no -/// hard ordering requirement for the aggregate functions (in terms of direction). -/// Then, we can generate two alternative requirements with opposite directions. -fn get_init_req( - aggr_expr: &[Arc], - order_by_expr: &[Option], -) -> Option { - for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) { - // If the aggregation function is a non-reversible order-sensitive function - // and there is a hard requirement, choose first such requirement: - if is_order_sensitive(aggr_expr) - && aggr_expr.reverse_expr().is_none() - && fn_reqs.is_some() - { - return fn_reqs.clone(); - } - } - None -} - -/// This function gets the finest ordering requirement among all the aggregation -/// functions. If requirements are conflicting, (i.e. we can not compute the -/// aggregations in a single [`AggregateExec`]), the function returns an error. -fn get_finest_requirement( - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - eq_properties: &EquivalenceProperties, -) -> Result> { - // First, we check if all the requirements are satisfied by the existing - // ordering. If so, we return `None` to indicate this. - let mut all_satisfied = true; - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - if eq_properties.ordering_satisfy(fn_req.as_deref().unwrap_or(&[])) { - continue; - } - if let Some(reverse) = aggr_expr.reverse_expr() { - let reverse_req = fn_req.as_ref().map(|item| reverse_order_bys(item)); - if eq_properties.ordering_satisfy(reverse_req.as_deref().unwrap_or(&[])) { - // We need to update `aggr_expr` with its reverse since only its - // reverse requirement is compatible with the existing requirements: - *aggr_expr = reverse; - *fn_req = reverse_req; - continue; - } - } - // Requirement is not satisfied: - all_satisfied = false; - } - if all_satisfied { - // All of the requirements are already satisfied. - return Ok(None); - } - let mut finest_req = get_init_req(aggr_expr, order_by_expr); - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - let Some(fn_req) = fn_req else { - continue; - }; - - if let Some(finest_req) = &mut finest_req { - if let Some(finer) = eq_properties.get_finer_ordering(finest_req, fn_req) { - *finest_req = finer; - continue; - } - // If an aggregate function is reversible, analyze whether its reverse - // direction is compatible with existing requirements: - if let Some(reverse) = aggr_expr.reverse_expr() { - let fn_req_reverse = reverse_order_bys(fn_req); - if let Some(finer) = - eq_properties.get_finer_ordering(finest_req, &fn_req_reverse) - { - // We need to update `aggr_expr` with its reverse, since only its - // reverse requirement is compatible with existing requirements: - *aggr_expr = reverse; - *finest_req = finer; - *fn_req = fn_req_reverse; - continue; - } - } - // If neither of the requirements satisfy the other, this means - // requirements are conflicting. Currently, we do not support - // conflicting requirements. - return not_impl_err!( - "Conflicting ordering requirements in aggregate functions is not supported" - ); - } else { - finest_req = Some(fn_req.clone()); - } - } - Ok(finest_req) -} - -/// Calculates search_mode for the aggregation -fn get_aggregate_search_mode( - group_by: &PhysicalGroupBy, - input: &Arc, - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - ordering_req: &mut Vec, -) -> InputOrderMode { - let groupby_exprs = group_by - .expr - .iter() - .map(|(item, _)| item.clone()) - .collect::>(); - let mut input_order_mode = InputOrderMode::Linear; - if !group_by.is_single() || groupby_exprs.is_empty() { - return input_order_mode; - } - - if let Some((should_reverse, mode)) = - get_window_mode(&groupby_exprs, ordering_req, input) - { - let all_reversible = aggr_expr - .iter() - .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some()); - if should_reverse && all_reversible { - izip!(aggr_expr.iter_mut(), order_by_expr.iter_mut()).for_each( - |(aggr, order_by)| { - if let Some(reverse) = aggr.reverse_expr() { - *aggr = reverse; - } else { - unreachable!(); - } - *order_by = order_by.as_ref().map(|ob| reverse_order_bys(ob)); - }, - ); - *ordering_req = reverse_order_bys(ordering_req); - } - input_order_mode = mode; - } - input_order_mode -} - -/// Check whether group by expression contains all of the expression inside `requirement` -// As an example Group By (c,b,a) contains all of the expressions in the `requirement`: (a ASC, b DESC) -fn group_by_contains_all_requirements( - group_by: &PhysicalGroupBy, - requirement: &LexOrdering, -) -> bool { - let physical_exprs = group_by.input_exprs(); - // When we have multiple groups (grouping set) - // since group by may be calculated on the subset of the group_by.expr() - // it is not guaranteed to have all of the requirements among group by expressions. - // Hence do the analysis: whether group by contains all requirements in the single group case. - group_by.is_single() - && requirement - .iter() - .all(|req| physical_exprs_contains(&physical_exprs, &req.expr)) -} - impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( @@ -447,12 +333,12 @@ impl AggregateExec { group_by.contains_null(), mode, )?; - let schema = Arc::new(materialize_dict_group_keys( &original_schema, group_by.expr.len(), )); let original_schema = Arc::new(original_schema); + AggregateExec::try_new_with_schema( mode, group_by, @@ -465,62 +351,19 @@ impl AggregateExec { ) } - /// Create a new hash aggregate execution plan with the given schema. - /// This constructor isn't part of the public API, it is used internally - /// by Datafusion to enforce schema consistency during when re-creating - /// `AggregateExec`s inside optimization rules. Schema field names of an - /// `AggregateExec` depends on the names of aggregate expressions. Since - /// a rule may re-write aggregate expressions (e.g. reverse them) during - /// initialization, field names may change inadvertently if one re-creates - /// the schema in such cases. + /// Create a new hash aggregate execution plan #[allow(clippy::too_many_arguments)] - fn try_new_with_schema( + pub fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, original_schema: SchemaRef, ) -> Result { - // Reset ordering requirement to `None` if aggregator is not order-sensitive - let mut order_by_expr = aggr_expr - .iter() - .map(|aggr_expr| { - let fn_reqs = aggr_expr.order_bys().map(|ordering| ordering.to_vec()); - // If - // - aggregation function is order-sensitive and - // - aggregation is performing a "first stage" calculation, and - // - at least one of the aggregate function requirement is not inside group by expression - // keep the ordering requirement as is; otherwise ignore the ordering requirement. - // In non-first stage modes, we accumulate data (using `merge_batch`) - // from different partitions (i.e. merge partial results). During - // this merge, we consider the ordering of each partial result. - // Hence, we do not need to use the ordering requirement in such - // modes as long as partial results are generated with the - // correct ordering. - fn_reqs.filter(|req| { - is_order_sensitive(aggr_expr) - && mode.is_first_stage() - && !group_by_contains_all_requirements(&group_by, req) - }) - }) - .collect::>(); - let requirement = get_finest_requirement( - &mut aggr_expr, - &mut order_by_expr, - &input.equivalence_properties(), - )?; - let mut ordering_req = requirement.unwrap_or(vec![]); - let input_order_mode = get_aggregate_search_mode( - &group_by, - &input, - &mut aggr_expr, - &mut order_by_expr, - &mut ordering_req, - ); - + let input_eq_properties = input.equivalence_properties(); // Get GROUP BY expressions: let groupby_exprs = group_by.input_exprs(); // If existing ordering satisfies a prefix of the GROUP BY expressions, @@ -528,17 +371,31 @@ impl AggregateExec { // work more efficiently. let indices = get_ordered_partition_by_indices(&groupby_exprs, &input); let mut new_requirement = indices - .into_iter() - .map(|idx| PhysicalSortRequirement { + .iter() + .map(|&idx| PhysicalSortRequirement { expr: groupby_exprs[idx].clone(), options: None, }) .collect::>(); - // Postfix ordering requirement of the aggregation to the requirement. - let req = PhysicalSortRequirement::from_sort_exprs(&ordering_req); + + let req = get_aggregate_exprs_requirement( + &aggr_expr, + &group_by, + &input_eq_properties, + &mode, + )?; new_requirement.extend(req); new_requirement = collapse_lex_req(new_requirement); + let input_order_mode = + if indices.len() == groupby_exprs.len() && !indices.is_empty() { + InputOrderMode::Sorted + } else if !indices.is_empty() { + InputOrderMode::PartiallySorted(indices) + } else { + InputOrderMode::Linear + }; + // construct a map from the input expression to the output expression of the Aggregation group by let projection_mapping = ProjectionMapping::try_new(&group_by.expr, &input.schema())?; @@ -546,9 +403,8 @@ impl AggregateExec { let required_input_ordering = (!new_requirement.is_empty()).then_some(new_requirement); - let aggregate_eqs = input - .equivalence_properties() - .project(&projection_mapping, schema.clone()); + let aggregate_eqs = + input_eq_properties.project(&projection_mapping, schema.clone()); let output_ordering = aggregate_eqs.oeq_class().output_ordering(); Ok(AggregateExec { @@ -998,6 +854,111 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { Arc::new(Schema::new(group_fields)) } +/// Determines the lexical ordering requirement for an aggregate expression. +/// +/// # Parameters +/// +/// - `aggr_expr`: A reference to an `Arc` representing the aggregate expression. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// +/// # Returns +/// +/// A `LexOrdering` instance indicating the lexical ordering requirement for the aggregate expression. +fn get_aggregate_expr_req( + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + agg_mode: &AggregateMode, +) -> LexOrdering { + // If + // - aggregation function is not order-sensitive and + // - aggregation is performing a "second stage" calculation, and + // - all aggregate function requirement is inside group by expression + // ignore the ordering requirement. + if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() { + return vec![]; + } + + let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec(); + + // In non-first stage modes, we accumulate data (using `merge_batch`) + // from different partitions (i.e. merge partial results). During + // this merge, we consider the ordering of each partial result. + // Hence, we do not need to use the ordering requirement in such + // modes as long as partial results are generated with the + // correct ordering. + if group_by.is_single() { + // Remove all orderings that occur in the group by. These requirements will be satisfied definitely + // (Per group each group by expression will have distinct values. Hence all requirements are satisfied). + let physical_exprs = group_by.input_exprs(); + req.retain(|sort_expr| { + !physical_exprs_contains(&physical_exprs, &sort_expr.expr) + }); + } + req +} + +/// Computes the finer ordering for between given existing ordering requirement of aggregate expression. +/// +/// # Parameters +/// +/// * `existing_req` - The existing lexical ordering that needs refinement. +/// * `aggr_expr` - A reference to an aggregate expression trait object. +/// * `group_by` - Information about the physical grouping (e.g group by expression). +/// * `eq_properties` - Equivalence properties relevant to the computation. +/// * `agg_mode` - The mode of aggregation (e.g., Partial, Final, etc.). +/// +/// # Returns +/// +/// An `Option` representing the computed finer lexical ordering, +/// or `None` if there is no finer ordering, e.g existing requirement and requirement of the +/// aggregator is incompatible. +fn finer_ordering( + existing_req: &LexOrdering, + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Option { + let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode); + eq_properties.get_finer_ordering(existing_req, &aggr_req) +} + +/// Groups aggregate expressions based on their ordering requirements. +/// +/// # Parameters +/// +/// - `aggr_exprs`: A mutable slice of `Arc` representing the aggregate expressions to be grouped. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// +/// # Returns +/// +/// A vector of `AggregateExprGroup` instances, each containing indices and ordering requirements for a group of +/// related aggregate expressions. +/// TODO: Update docstring +fn get_aggregate_exprs_requirement( + aggr_exprs: &[Arc], + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Result { + let mut requirement = vec![]; + for aggr_expr in aggr_exprs.iter() { + if let Some(finer_ordering) = + finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode) + { + requirement = finer_ordering; + } else { + return Err(plan_datafusion_err!( + "Conflicting ordering requirement is not supported" + )); + } + } + Ok(PhysicalSortRequirement::from_sort_exprs(&requirement)) +} + /// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: /// * Partial: AggregateExpr::expressions @@ -1014,17 +975,15 @@ fn aggregate_expressions( .iter() .map(|agg| { let mut result = agg.expressions().clone(); - // In partial mode, append ordering requirements to expressions' results. - // Ordering requirements are used by subsequent executors to satisfy the required - // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes. - if matches!(mode, AggregateMode::Partial) { - if let Some(ordering_req) = agg.order_bys() { - let ordering_exprs = ordering_req - .iter() - .map(|item| item.expr.clone()) - .collect::>(); - result.extend(ordering_exprs); - } + // Append ordering requirements to expressions' results. + // This way order sensitive aggregators can satisfy requirement + // themselves. + if let Some(ordering_req) = agg.order_bys() { + let ordering_exprs = ordering_req + .iter() + .map(|item| item.expr.clone()) + .collect::>(); + result.extend(ordering_exprs); } result }) @@ -1204,9 +1163,7 @@ mod tests { use std::task::{Context, Poll}; use super::*; - use crate::aggregates::{ - get_finest_requirement, AggregateExec, AggregateMode, PhysicalGroupBy, - }; + use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; @@ -1230,7 +1187,7 @@ mod tests { use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Count, FirstValue, LastValue, Median, + lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::{ AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, @@ -2093,11 +2050,6 @@ mod tests { descending: false, nulls_first: false, }; - // This is the reverse requirement of options1 - let options2 = SortOptions { - descending: true, - nulls_first: true, - }; let col_a = &col("a", &test_schema)?; let col_b = &col("b", &test_schema)?; let col_c = &col("c", &test_schema)?; @@ -2106,7 +2058,7 @@ mod tests { eq_properties.add_equal_conditions(col_a, col_b); // Aggregate requirements are // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively - let mut order_by_exprs = vec![ + let order_by_exprs = vec![ None, Some(vec![PhysicalSortExpr { expr: col_a.clone(), @@ -2136,14 +2088,8 @@ mod tests { options: options1, }, ]), - // Since aggregate expression is reversible (FirstValue), we should be able to resolve below - // contradictory requirement by reversing it. - Some(vec![PhysicalSortExpr { - expr: col_b.clone(), - options: options2, - }]), ]; - let common_requirement = Some(vec![ + let common_requirement = vec![ PhysicalSortExpr { expr: col_a.clone(), options: options1, @@ -2152,70 +2098,29 @@ mod tests { expr: col_c.clone(), options: options1, }, - ]); - let aggr_expr = Arc::new(FirstValue::new( - col_a.clone(), - "first1", - DataType::Int32, - vec![], - vec![], - )) as _; - let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()]; - let res = - get_finest_requirement(&mut aggr_exprs, &mut order_by_exprs, &eq_properties)?; - assert_eq!(res, common_requirement); - Ok(()) - } - - #[test] - fn test_agg_exec_same_schema() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Float32, true), - Field::new("b", DataType::Float32, true), - ])); - - let col_a = col("a", &schema)?; - let col_b = col("b", &schema)?; - let option_desc = SortOptions { - descending: true, - nulls_first: true, - }; - let sort_expr = vec![PhysicalSortExpr { - expr: col_b.clone(), - options: option_desc, - }]; - let sort_expr_reverse = reverse_order_bys(&sort_expr); - let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); - - let aggregates: Vec> = vec![ - Arc::new(FirstValue::new( - col_b.clone(), - "FIRST_VALUE(b)".to_string(), - DataType::Float64, - sort_expr_reverse.clone(), - vec![DataType::Float64], - )), - Arc::new(LastValue::new( - col_b.clone(), - "LAST_VALUE(b)".to_string(), - DataType::Float64, - sort_expr.clone(), - vec![DataType::Float64], - )), ]; - let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); - let aggregate_exec = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups, - aggregates.clone(), - vec![None, None], - blocking_exec.clone(), - schema, - )?); - let new_agg = aggregate_exec - .clone() - .with_new_children(vec![blocking_exec])?; - assert_eq!(new_agg.schema(), aggregate_exec.schema()); + let aggr_exprs = order_by_exprs + .into_iter() + .map(|order_by_expr| { + Arc::new(OrderSensitiveArrayAgg::new( + col_a.clone(), + "array_agg", + DataType::Int32, + false, + vec![], + order_by_expr.unwrap_or_default(), + )) as _ + }) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(vec![]); + let res = get_aggregate_exprs_requirement( + &aggr_exprs, + &group_by, + &eq_properties, + &AggregateMode::Partial, + )?; + let res = PhysicalSortRequirement::to_sort_exprs(res); + assert_eq!(res, common_requirement); Ok(()) } } From da1cf7112d1f5f1a56c7635aa3d490dac50f12e0 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 14:37:02 +0300 Subject: [PATCH 02/10] Update tests in distinct_on --- datafusion/sqllogictest/test_files/distinct_on.slt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index 9a7117b69b99..3f609e254839 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -78,7 +78,7 @@ c 4 query I SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3; ---- -5 +4 4 2 1 @@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes ------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted ---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true +------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true # ON expressions are not a sub-set of the ORDER BY expressions query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions From 202936d6ad35eea30b38c0d732a94f65dc125d08 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 14:38:49 +0300 Subject: [PATCH 03/10] Update group by joins slt --- .../sqllogictest/test_files/groupby.slt | 90 +++++++++---------- datafusion/sqllogictest/test_files/joins.slt | 4 +- 2 files changed, 43 insertions(+), 51 deletions(-) diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index f1b6a57287b5..c4805c777cf3 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2019,17 +2019,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0]) ---------------SortExec: expr=[col0@3 ASC NULLS LAST] -----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] -------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] +------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] +--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] +----------------CoalesceBatchesExec: target_batch_size=8192 +------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2213,10 +2212,10 @@ SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c FROM annotated_data_infinite2 GROUP BY a, b ---- -0 0 24 -0 1 49 -1 2 74 -1 3 99 +0 0 0 +0 1 25 +1 2 50 +1 3 75 # when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement # queries should still work, However, result depends on the scanning order and @@ -2343,7 +2342,7 @@ TUR [100.0, 75.0] 175 # test_ordering_sensitive_aggregation3 # When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation. # test below should raise Plan Error. -statement error DataFusion error: This feature is not implemented: Conflicting ordering requirements in aggregate functions is not supported +statement error DataFusion error: Error during planning: Conflicting ordering requirement is not supported SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2, ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3 @@ -2509,7 +2508,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 DESC] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2540,7 +2539,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2572,7 +2571,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2637,9 +2636,8 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ------TableScan: sales_global projection=[country, ts, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2672,8 +2670,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 DESC] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2709,12 +2706,11 @@ physical_plan SortExec: expr=[sn@2 ASC NULLS LAST] --ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate] ----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)] -------SortExec: expr=[sn@5 ASC NULLS LAST] ---------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] -----------CoalesceBatchesExec: target_batch_size=8192 -------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 ---------------MemoryExec: partitions=1, partition_sizes=[1] ---------------MemoryExec: partitions=1, partition_sizes=[1] +------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] +--------CoalesceBatchesExec: target_batch_size=8192 +----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 +------------MemoryExec: partitions=1, partition_sizes=[1] +------------MemoryExec: partitions=1, partition_sizes=[1] query ITIPTR rowsort SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate @@ -2759,8 +2755,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 --------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2791,13 +2786,12 @@ physical_plan SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2] -------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2831,16 +2825,15 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts --AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, LAST_VALUE(amount ORDER BY ts ASC) AS fv2 FROM sales_global ---- -30 80 +30 100 # Conversion in between FIRST_VALUE and LAST_VALUE to resolve # contradictory requirements should work in multi partitions. @@ -2855,12 +2848,11 @@ Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS ----TableScan: sales_global projection=[ts, amount] physical_plan ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2] ---AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----CoalescePartitionsExec -------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2993,10 +2985,10 @@ physical_plan SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------------SortExec: expr=[amount@1 DESC] ----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3639,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_tab ----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] ---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 ---------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 9a349f600091..a7146a5a91c4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3454,7 +3454,7 @@ SortPreservingMergeExec: [a@0 ASC] ------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2 -------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0]) +------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------------CoalesceBatchesExec: target_batch_size=2 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] ------------------CoalesceBatchesExec: target_batch_size=2 @@ -3462,7 +3462,7 @@ SortPreservingMergeExec: [a@0 ASC] ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true ------------------CoalesceBatchesExec: target_batch_size=2 ---------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC NULLS LAST +--------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true From 05bdc810a634290aaef2e5f5254bd04887593ad8 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 14:53:51 +0300 Subject: [PATCH 04/10] Remove unused code --- .../physical-plan/src/aggregates/mod.rs | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e3daa1ffbc25..fb6a988b1cc6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -101,34 +101,6 @@ impl AggregateMode { } } -/// Group By expression modes -/// -/// `PartiallyOrdered` and `FullyOrdered` are used to reason about -/// when certain group by keys will never again be seen (and thus can -/// be emitted by the grouping operator). -/// -/// Specifically, each distinct combination of the relevant columns -/// are contiguous in the input, and once a new combination is seen -/// previous combinations are guaranteed never to appear again -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum GroupByOrderMode { - /// The input is known to be ordered by a preset (prefix but - /// possibly reordered) of the expressions in the `GROUP BY` clause. - /// - /// For example, if the input is ordered by `a, b, c` and we group - /// by `b, a, d`, `PartiallyOrdered` means a subset of group `b, - /// a, d` defines a preset for the existing ordering, in this case - /// `a, b`. - PartiallyOrdered, - /// The input is known to be ordered by *all* the expressions in the - /// `GROUP BY` clause. - /// - /// For example, if the input is ordered by `a, b, c, d` and we group by b, a, - /// `Ordered` means that all of the of group by expressions appear - /// as a preset for the existing ordering, in this case `a, b`. - FullyOrdered, -} - /// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET) /// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b] /// and a single group [false, false]. @@ -267,17 +239,6 @@ impl From for SendableRecordBatchStream { } } -/// A structure representing a group of aggregate expressions where each group has different -/// ordering requirements. Aggregate groups are calculated using `get_aggregate_expr_groups` function. -/// Indices refers to the position of each aggregate expressions among all aggregate expressions (prior to grouping). -#[derive(Clone, Debug, PartialEq)] -pub struct AggregateExprGroup { - /// Aggregate expressions indices - indices: Vec, - /// Requirement - requirement: LexOrdering, -} - /// Hash aggregate execution plan #[derive(Debug)] pub struct AggregateExec { From 416ac3a8ae05d6f5e829a0a0aaff7a29118b200e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 14:58:33 +0300 Subject: [PATCH 05/10] Minor changes --- .../physical-plan/src/aggregates/mod.rs | 68 +++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index fb6a988b1cc6..c95ba7c2492d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -298,8 +298,8 @@ impl AggregateExec { &original_schema, group_by.expr.len(), )); - let original_schema = Arc::new(original_schema); + let original_schema = Arc::new(original_schema); AggregateExec::try_new_with_schema( mode, group_by, @@ -312,9 +312,16 @@ impl AggregateExec { ) } - /// Create a new hash aggregate execution plan + /// Create a new hash aggregate execution plan with the given schema. + /// This constructor isn't part of the public API, it is used internally + /// by Datafusion to enforce schema consistency during when re-creating + /// `AggregateExec`s inside optimization rules. Schema field names of an + /// `AggregateExec` depends on the names of aggregate expressions. Since + /// a rule may re-write aggregate expressions (e.g. reverse them) during + /// initialization, field names may change inadvertently if one re-creates + /// the schema in such cases. #[allow(clippy::too_many_arguments)] - pub fn try_new_with_schema( + fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, aggr_expr: Vec>, @@ -1151,7 +1158,8 @@ mod tests { lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr, + PhysicalSortExpr, }; use datafusion_execution::memory_pool::FairSpillPool; @@ -2084,4 +2092,56 @@ mod tests { assert_eq!(res, common_requirement); Ok(()) } + + #[test] + fn test_agg_exec_same_schema() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float32, true), + Field::new("b", DataType::Float32, true), + ])); + + let col_a = col("a", &schema)?; + let col_b = col("b", &schema)?; + let option_desc = SortOptions { + descending: true, + nulls_first: true, + }; + let sort_expr = vec![PhysicalSortExpr { + expr: col_b.clone(), + options: option_desc, + }]; + let sort_expr_reverse = reverse_order_bys(&sort_expr); + let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); + + let aggregates: Vec> = vec![ + Arc::new(FirstValue::new( + col_b.clone(), + "FIRST_VALUE(b)".to_string(), + DataType::Float64, + sort_expr_reverse.clone(), + vec![DataType::Float64], + )), + Arc::new(LastValue::new( + col_b.clone(), + "LAST_VALUE(b)".to_string(), + DataType::Float64, + sort_expr.clone(), + vec![DataType::Float64], + )), + ]; + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups, + aggregates.clone(), + vec![None, None], + blocking_exec.clone(), + schema, + )?); + let new_agg = aggregate_exec + .clone() + .with_new_children(vec![blocking_exec])?; + assert_eq!(new_agg.schema(), aggregate_exec.schema()); + Ok(()) + } } From 06adf250c1fd9b3ea49e2c3a62b82c3726d65d96 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 15:04:49 +0300 Subject: [PATCH 06/10] Minor changes --- datafusion/physical-plan/src/aggregates/mod.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c95ba7c2492d..5f8d85b06fb6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -294,11 +294,11 @@ impl AggregateExec { group_by.contains_null(), mode, )?; + let schema = Arc::new(materialize_dict_group_keys( &original_schema, group_by.expr.len(), )); - let original_schema = Arc::new(original_schema); AggregateExec::try_new_with_schema( mode, @@ -892,20 +892,19 @@ fn finer_ordering( eq_properties.get_finer_ordering(existing_req, &aggr_req) } -/// Groups aggregate expressions based on their ordering requirements. +/// Get common requirement that satisfies all of the aggregate expressions /// /// # Parameters /// -/// - `aggr_exprs`: A mutable slice of `Arc` representing the aggregate expressions to be grouped. +/// - `aggr_exprs`: A slice of `Arc` containing all of the aggregate expressions. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. /// - `eq_properties`: A reference to an `EquivalenceProperties` instance representing equivalence properties for ordering. /// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. /// /// # Returns /// -/// A vector of `AggregateExprGroup` instances, each containing indices and ordering requirements for a group of -/// related aggregate expressions. -/// TODO: Update docstring +/// A `LexRequirement` instance, which is the requirement that satisfies all of the aggregate requirements. If there are +/// conflicting e.g un-compatible requirements returns Error. fn get_aggregate_exprs_requirement( aggr_exprs: &[Arc], group_by: &PhysicalGroupBy, From e208ebff30586da6d7093a4961ee4cc1e1a5e194 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 15:28:56 +0300 Subject: [PATCH 07/10] Simplifications --- datafusion/physical-plan/src/aggregates/mod.rs | 11 +++++++---- datafusion/sqllogictest/test_files/groupby.slt | 12 ++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5f8d85b06fb6..6a15fc33be35 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -38,7 +38,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::stats::Precision; -use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; use datafusion_physical_expr::{ @@ -918,9 +918,12 @@ fn get_aggregate_exprs_requirement( { requirement = finer_ordering; } else { - return Err(plan_datafusion_err!( - "Conflicting ordering requirement is not supported" - )); + // If neither of the requirements satisfy the other, this means + // requirements are conflicting. Currently, we do not support + // conflicting requirements. + return not_impl_err!( + "Conflicting ordering requirements in aggregate functions is not supported" + ); } } Ok(PhysicalSortRequirement::from_sort_exprs(&requirement)) diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index c4805c777cf3..bbf21e135fe4 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2208,14 +2208,14 @@ ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III -SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c +SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c FROM annotated_data_infinite2 GROUP BY a, b ---- -0 0 0 -0 1 25 -1 2 50 -1 3 75 +0 0 24 +0 1 49 +1 2 74 +1 3 99 # when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement # queries should still work, However, result depends on the scanning order and @@ -2342,7 +2342,7 @@ TUR [100.0, 75.0] 175 # test_ordering_sensitive_aggregation3 # When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation. # test below should raise Plan Error. -statement error DataFusion error: Error during planning: Conflicting ordering requirement is not supported +statement error DataFusion error: This feature is not implemented: Conflicting ordering requirements in aggregate functions is not supported SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2, ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3 From 0ece59386620629e03a9b0a4f6df51a27a64f7f7 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Dec 2023 15:34:32 +0300 Subject: [PATCH 08/10] Update comments --- datafusion/physical-expr/src/aggregate/first_last.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index bfd6143695cf..b95fd09e9528 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -236,6 +236,7 @@ impl FirstValueAccumulator { let ordering_values = &values[1..]; assert_eq!(ordering_values.len(), self.ordering_req.len()); if self.ordering_req.is_empty() { + // Get first entry according to receive order (0th index) return Ok((!value.is_empty()).then_some(0)); } let sort_columns = ordering_values @@ -521,13 +522,14 @@ impl LastValueAccumulator { let ordering_values = &values[1..]; assert_eq!(ordering_values.len(), self.ordering_req.len()); if self.ordering_req.is_empty() { + // Get last entry according to receive order (last index) return Ok((!value.is_empty()).then_some(value.len() - 1)); } let sort_columns = ordering_values .iter() .zip(self.ordering_req.iter()) .map(|(values, req)| { - // Take reverse ordering requirement this would enable us to use fetch=1 for last value. + // Take reverse ordering requirement this enables us to use fetch=1 for last value. SortColumn { values: values.clone(), options: Some(!req.options), From 298fcf0b9be894d00eb65705f707e1c96bd45315 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Thu, 28 Dec 2023 21:16:59 +0300 Subject: [PATCH 09/10] Review --- .../physical-expr/src/aggregate/first_last.rs | 58 ++++----- datafusion/physical-expr/src/aggregate/mod.rs | 23 ++-- .../physical-expr/src/aggregate/utils.rs | 16 ++- .../physical-plan/src/aggregates/mod.rs | 117 +++++++++--------- 4 files changed, 106 insertions(+), 108 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index b95fd09e9528..c7032e601cf8 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -30,7 +30,9 @@ use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; -use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::Accumulator; /// FIRST_VALUE aggregate expression @@ -211,11 +213,11 @@ impl FirstValueAccumulator { // Updates state with the values in the given row. fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { - let value = &row[0]; - let orderings = &row[1..]; - // Update when - // - no entry in the state - // - There is an earlier entry in according to requirements + let [value, orderings @ ..] = row else { + return internal_err!("Empty row in FIRST_VALUE"); + }; + // Update when there is no entry in the state, or we have an "earlier" + // entry according to sort requirements. if !self.is_set || compare_rows( &self.orderings, @@ -232,9 +234,9 @@ impl FirstValueAccumulator { } fn get_first_idx(&self, values: &[ArrayRef]) -> Result> { - let value = &values[0]; - let ordering_values = &values[1..]; - assert_eq!(ordering_values.len(), self.ordering_req.len()); + let [value, ordering_values @ ..] = values else { + return internal_err!("Empty row in FIRST_VALUE"); + }; if self.ordering_req.is_empty() { // Get first entry according to receive order (0th index) return Ok((!value.is_empty()).then_some(0)); @@ -248,10 +250,7 @@ impl FirstValueAccumulator { }) .collect::>(); let indices = lexsort_to_indices(&sort_columns, Some(1))?; - if !indices.is_empty() { - return Ok(Some(indices.value(0) as usize)); - } - Ok(None) + Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } } @@ -495,12 +494,11 @@ impl LastValueAccumulator { // Updates state with the values in the given row. fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { - let value = &row[0]; - let orderings = &row[1..]; - // Update when - // - no value in the state - // - There is no specific requirement, but a new value (most recent entry in terms of execution) - // - There is a more recent entry in terms of requirement + let [value, orderings @ ..] = row else { + return internal_err!("Empty row in LAST_VALUE"); + }; + // Update when there is no entry in the state, or we have a "later" + // entry (either according to sort requirements or the order of execution). if !self.is_set || self.orderings.is_empty() || compare_rows( @@ -518,18 +516,19 @@ impl LastValueAccumulator { } fn get_last_idx(&self, values: &[ArrayRef]) -> Result> { - let value = &values[0]; - let ordering_values = &values[1..]; - assert_eq!(ordering_values.len(), self.ordering_req.len()); + let [value, ordering_values @ ..] = values else { + return internal_err!("Empty row in LAST_VALUE"); + }; if self.ordering_req.is_empty() { - // Get last entry according to receive order (last index) + // Get last entry according to the order of data: return Ok((!value.is_empty()).then_some(value.len() - 1)); } let sort_columns = ordering_values .iter() .zip(self.ordering_req.iter()) .map(|(values, req)| { - // Take reverse ordering requirement this enables us to use fetch=1 for last value. + // Take the reverse ordering requirement. This enables us to + // use "fetch = 1" to get the last value. SortColumn { values: values.clone(), options: Some(!req.options), @@ -537,10 +536,7 @@ impl LastValueAccumulator { }) .collect::>(); let indices = lexsort_to_indices(&sort_columns, Some(1))?; - if !indices.is_empty() { - return Ok(Some(indices.value(0) as usize)); - } - Ok(None) + Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } } @@ -638,16 +634,16 @@ fn convert_to_sort_cols( #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator}; + use arrow::compute::concat; use arrow_array::{ArrayRef, Int64Array}; use arrow_schema::DataType; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; - use arrow::compute::concat; - use std::sync::Arc; - #[test] fn test_first_last_value_value() -> Result<()> { let mut first_accumulator = diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 695f08dfc1b3..5bd1fca385b1 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,16 +15,20 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use self::groups_accumulator::GroupsAccumulator; use crate::expressions::OrderSensitiveArrayAgg; use crate::{PhysicalExpr, PhysicalSortExpr}; + use arrow::datatypes::Field; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_expr::Accumulator; -use std::any::Any; -use std::fmt::Debug; -use std::sync::Arc; -use self::groups_accumulator::GroupsAccumulator; +mod hyperloglog; +mod tdigest; pub(crate) mod approx_distinct; pub(crate) mod approx_median; @@ -46,19 +50,18 @@ pub(crate) mod median; pub(crate) mod string_agg; #[macro_use] pub(crate) mod min_max; -pub mod build_in; pub(crate) mod groups_accumulator; -mod hyperloglog; -pub mod moving_min_max; pub(crate) mod regr; pub(crate) mod stats; pub(crate) mod stddev; pub(crate) mod sum; pub(crate) mod sum_distinct; -mod tdigest; -pub mod utils; pub(crate) mod variance; +pub mod build_in; +pub mod moving_min_max; +pub mod utils; + /// An aggregate expression that: /// * knows its resulting field /// * knows how to create its accumulator @@ -134,7 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. -/// However, a `ARRAY_AGG` with `ORDER BY` depends on the input ordering. +/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering. pub fn is_order_sensitive(aggr_expr: &Arc) -> bool { aggr_expr.as_any().is::() } diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index 7ba7e9d01567..9777158da133 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -17,20 +17,21 @@ //! Utilities used in aggregates +use std::any::Any; +use std::sync::Arc; + use crate::{AggregateExpr, PhysicalSortExpr}; -use arrow::array::ArrayRef; + +use arrow::array::{ArrayRef, ArrowNativeTypeOp}; use arrow_array::cast::AsArray; use arrow_array::types::{ Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; -use arrow_array::ArrowNativeTypeOp; use arrow_buffer::ArrowNativeType; use arrow_schema::{DataType, Field, SortOptions}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; -use std::any::Any; -use std::sync::Arc; /// Convert scalar values from an accumulator into arrays. pub fn get_accum_scalar_values_as_arrays( @@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays( .state()? .iter() .map(|s| s.to_array_of_size(1)) - .collect::>>() + .collect() } /// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow @@ -208,8 +209,5 @@ pub(crate) fn ordering_fields( /// Selects the sort option attribute from all the given `PhysicalSortExpr`s. pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { - ordering_req - .iter() - .map(|item| item.options) - .collect::>() + ordering_req.iter().map(|item| item.options).collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 6a15fc33be35..f5bb4fe59b5d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -826,38 +826,41 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// /// # Parameters /// -/// - `aggr_expr`: A reference to an `Arc` representing the aggregate expression. -/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. -/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// - `aggr_expr`: A reference to an `Arc` representing the +/// aggregate expression. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. /// /// # Returns /// -/// A `LexOrdering` instance indicating the lexical ordering requirement for the aggregate expression. +/// A `LexOrdering` instance indicating the lexical ordering requirement for +/// the aggregate expression. fn get_aggregate_expr_req( aggr_expr: &Arc, group_by: &PhysicalGroupBy, agg_mode: &AggregateMode, ) -> LexOrdering { - // If - // - aggregation function is not order-sensitive and - // - aggregation is performing a "second stage" calculation, and - // - all aggregate function requirement is inside group by expression - // ignore the ordering requirement. + // If the aggregation function is not order sensitive, or the aggregation + // is performing a "second stage" calculation, or all aggregate function + // requirements are inside the GROUP BY expression, then ignore the ordering + // requirement. if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() { return vec![]; } let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec(); - // In non-first stage modes, we accumulate data (using `merge_batch`) - // from different partitions (i.e. merge partial results). During - // this merge, we consider the ordering of each partial result. - // Hence, we do not need to use the ordering requirement in such - // modes as long as partial results are generated with the - // correct ordering. + // In non-first stage modes, we accumulate data (using `merge_batch`) from + // different partitions (i.e. merge partial results). During this merge, we + // consider the ordering of each partial result. Hence, we do not need to + // use the ordering requirement in such modes as long as partial results are + // generated with the correct ordering. if group_by.is_single() { - // Remove all orderings that occur in the group by. These requirements will be satisfied definitely - // (Per group each group by expression will have distinct values. Hence all requirements are satisfied). + // Remove all orderings that occur in the group by. These requirements + // will definitely be satisfied -- Each group by expression will have + // distinct values per group, hence all requirements are satisfied. let physical_exprs = group_by.input_exprs(); req.retain(|sort_expr| { !physical_exprs_contains(&physical_exprs, &sort_expr.expr) @@ -866,7 +869,8 @@ fn get_aggregate_expr_req( req } -/// Computes the finer ordering for between given existing ordering requirement of aggregate expression. +/// Computes the finer ordering for between given existing ordering requirement +/// of aggregate expression. /// /// # Parameters /// @@ -879,8 +883,8 @@ fn get_aggregate_expr_req( /// # Returns /// /// An `Option` representing the computed finer lexical ordering, -/// or `None` if there is no finer ordering, e.g existing requirement and requirement of the -/// aggregator is incompatible. +/// or `None` if there is no finer ordering; e.g. the existing requirement and +/// the aggregator requirement is incompatible. fn finer_ordering( existing_req: &LexOrdering, aggr_expr: &Arc, @@ -892,19 +896,23 @@ fn finer_ordering( eq_properties.get_finer_ordering(existing_req, &aggr_req) } -/// Get common requirement that satisfies all of the aggregate expressions +/// Get the common requirement that satisfies all the aggregate expressions. /// /// # Parameters /// -/// - `aggr_exprs`: A slice of `Arc` containing all of the aggregate expressions. -/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. -/// - `eq_properties`: A reference to an `EquivalenceProperties` instance representing equivalence properties for ordering. -/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// - `aggr_exprs`: A slice of `Arc` containing all the +/// aggregate expressions. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance +/// representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. /// /// # Returns /// -/// A `LexRequirement` instance, which is the requirement that satisfies all of the aggregate requirements. If there are -/// conflicting e.g un-compatible requirements returns Error. +/// A `LexRequirement` instance, which is the requirement that satisfies all the +/// aggregate requirements. Returns an error in case of conflicting requirements. fn get_aggregate_exprs_requirement( aggr_exprs: &[Arc], group_by: &PhysicalGroupBy, @@ -944,31 +952,27 @@ fn aggregate_expressions( | AggregateMode::SinglePartitioned => Ok(aggr_expr .iter() .map(|agg| { - let mut result = agg.expressions().clone(); - // Append ordering requirements to expressions' results. - // This way order sensitive aggregators can satisfy requirement + let mut result = agg.expressions(); + // Append ordering requirements to expressions' results. This + // way order sensitive aggregators can satisfy requirement // themselves. if let Some(ordering_req) = agg.order_bys() { - let ordering_exprs = ordering_req - .iter() - .map(|item| item.expr.clone()) - .collect::>(); - result.extend(ordering_exprs); + result.extend(ordering_req.iter().map(|item| item.expr.clone())); } result }) .collect()), - // in this mode, we build the merge expressions of the aggregation + // In this mode, we build the merge expressions of the aggregation. AggregateMode::Final | AggregateMode::FinalPartitioned => { let mut col_idx_base = col_idx_base; - Ok(aggr_expr + aggr_expr .iter() .map(|agg| { let exprs = merge_expressions(col_idx_base, agg)?; col_idx_base += exprs.len(); Ok(exprs) }) - .collect::>>()?) + .collect() } } } @@ -981,14 +985,13 @@ fn merge_expressions( index_base: usize, expr: &Arc, ) -> Result>> { - Ok(expr - .state_fields()? - .iter() - .enumerate() - .map(|(idx, f)| { - Arc::new(Column::new(f.name(), index_base + idx)) as Arc - }) - .collect::>()) + expr.state_fields().map(|fields| { + fields + .iter() + .enumerate() + .map(|(idx, f)| Arc::new(Column::new(f.name(), index_base + idx)) as _) + .collect() + }) } pub(crate) type AccumulatorItem = Box; @@ -999,7 +1002,7 @@ fn create_accumulators( aggr_expr .iter() .map(|expr| expr.create_accumulator()) - .collect::>>() + .collect() } /// returns a vector of ArrayRefs, where each entry corresponds to either the @@ -1010,8 +1013,8 @@ fn finalize_aggregation( ) -> Result> { match mode { AggregateMode::Partial => { - // build the vector of states - let a = accumulators + // Build the vector of states + accumulators .iter() .map(|accumulator| { accumulator.state().and_then(|e| { @@ -1020,18 +1023,18 @@ fn finalize_aggregation( .collect::>>() }) }) - .collect::>>()?; - Ok(a.iter().flatten().cloned().collect::>()) + .flatten_ok() + .collect() } AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Single | AggregateMode::SinglePartitioned => { - // merge the state to the final value + // Merge the state to the final value accumulators .iter() .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array())) - .collect::>>() + .collect() } } } @@ -1054,9 +1057,7 @@ pub(crate) fn evaluate_many( expr: &[Vec>], batch: &RecordBatch, ) -> Result>> { - expr.iter() - .map(|expr| evaluate(expr, batch)) - .collect::>>() + expr.iter().map(|expr| evaluate(expr, batch)).collect() } fn evaluate_optional( @@ -1072,7 +1073,7 @@ fn evaluate_optional( }) .transpose() }) - .collect::>>() + .collect() } /// Evaluate a group by expression against a `RecordBatch` @@ -1155,6 +1156,7 @@ mod tests { Result, ScalarValue, }; use datafusion_execution::config::SessionConfig; + use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, @@ -1164,7 +1166,6 @@ mod tests { PhysicalSortExpr, }; - use datafusion_execution::memory_pool::FairSpillPool; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) From 64666df28818a4b384b1559303783b08133fa8b3 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Thu, 28 Dec 2023 22:24:16 +0300 Subject: [PATCH 10/10] Fix clippy --- datafusion-cli/src/functions.rs | 2 +- datafusion/common/src/error.rs | 1 - datafusion/core/src/physical_optimizer/projection_pushdown.rs | 4 ++++ datafusion/optimizer/src/simplify_expressions/guarantees.rs | 4 ++++ datafusion/physical-expr/src/array_expressions.rs | 2 +- datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs | 1 - 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index f8d9ed238be4..5390fa9f2271 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {} impl TableFunctionImpl for ParquetMetadataFunc { fn call(&self, exprs: &[Expr]) -> Result> { - let filename = match exprs.get(0) { + let filename = match exprs.first() { Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") _ => { diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 515acc6d1c47..e58faaa15096 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -558,7 +558,6 @@ macro_rules! arrow_err { // To avoid compiler error when using macro in the same crate: // macros from the current crate cannot be referred to by absolute paths -pub use exec_err as _exec_err; pub use internal_datafusion_err as _internal_datafusion_err; pub use internal_err as _internal_err; pub use not_impl_err as _not_impl_err; diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 7e1312dad23e..d237a3e8607e 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -990,6 +990,10 @@ fn update_join_on( proj_right_exprs: &[(Column, String)], hash_join_on: &[(Column, Column)], ) -> Option> { + // TODO: Clippy wants the "map" call removed, but doing so generates + // a compilation error. Remove the clippy directive once this + // issue is fixed. + #[allow(clippy::map_identity)] let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on .iter() .map(|(left, right)| (left, right)) diff --git a/datafusion/optimizer/src/simplify_expressions/guarantees.rs b/datafusion/optimizer/src/simplify_expressions/guarantees.rs index 860dc326b9b0..aa7bb4f78a93 100644 --- a/datafusion/optimizer/src/simplify_expressions/guarantees.rs +++ b/datafusion/optimizer/src/simplify_expressions/guarantees.rs @@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> { guarantees: impl IntoIterator, ) -> Self { Self { + // TODO: Clippy wants the "map" call removed, but doing so generates + // a compilation error. Remove the clippy directive once this + // issue is fixed. + #[allow(clippy::map_identity)] guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(), } } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 3ee99d7e8e55..483e330e08f8 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2444,7 +2444,7 @@ pub fn general_array_distinct( let last_offset: OffsetSize = offsets.last().copied().unwrap(); offsets.push(last_offset + OffsetSize::usize_as(rows.len())); let arrays = converter.convert_rows(rows)?; - let array = match arrays.get(0) { + let array = match arrays.first() { Some(array) => array.clone(), None => { return internal_err!("array_distinct: failed to get array from rows") diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs index 663bbdd5a3c7..8e2bbbfe4f69 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs @@ -21,5 +21,4 @@ mod normalize; mod runner; pub use error::*; -pub use normalize::*; pub use runner::*;