diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index eb5ae8b0b0c3..8965199e0c2c 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -24,7 +24,7 @@ use std::collections::BinaryHeap; use std::fmt::Debug; use std::sync::Arc; -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; +use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; use crate::expressions::format_state_name; use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr}; @@ -192,14 +192,30 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if values.is_empty() { return Ok(()); } + let value_array_ref = &values[0]; + let ordering_array_refs = &values[1..]; - let n_row = values[0].len(); - for index in 0..n_row { - let row = get_row_at_idx(values, index)?; - self.values.push(row[0].clone()); - self.ordering_values.push(row[1..].to_vec()); - } + let num_rows = value_array_ref.len(); + // Convert &[ArrayRef] to Vec> + let new_ordering_values = (0..num_rows) + .map(|idx| get_row_at_idx(ordering_array_refs, idx)) + .collect::>>()?; + + // Convert ArrayRef to Vec + let new_scalar_values = (0..num_rows) + .map(|idx| ScalarValue::try_from_array(value_array_ref, idx)) + .collect::>>()?; + + let sort_options = get_sort_options(&self.ordering_req); + // Merge new values and new orderings + let (merged_values, merged_ordering_values) = merge_ordered_arrays( + &[&self.values, &new_scalar_values], + &[&self.ordering_values, &new_ordering_values], + &sort_options, + )?; + self.values = merged_values; + self.ordering_values = merged_ordering_values; Ok(()) } @@ -215,45 +231,37 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { let agg_orderings = &states[1]; if let Some(agg_orderings) = agg_orderings.as_list_opt::() { - // Stores ARRAY_AGG results coming from each partition - let mut partition_values = vec![]; - // Stores ordering requirement expression results coming from each partition - let mut partition_ordering_values = vec![]; - - // Existing values should be merged also. - partition_values.push(self.values.clone()); - partition_ordering_values.push(self.ordering_values.clone()); + // Stores ARRAY_AGG results coming from each partition. Existing values should be merged also. + let mut partition_values = vec![self.values.as_slice()]; let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - for v in array_agg_res.into_iter() { - partition_values.push(v); - } + partition_values.extend(array_agg_res.iter().map(|v| v.as_slice())); - let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; + // Stores ordering requirement expression results coming from each partition. Existing values should be merged also. + let mut partition_ordering_values = vec![self.ordering_values.as_slice()]; - for partition_ordering_rows in orderings.into_iter() { - // Extract value from struct to ordering_rows for each group/partition - let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { - if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row { - Ok(ordering_columns_per_row) - } else { - exec_err!( + let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)? + .into_iter() + .map(|partition_ordering_rows| { + partition_ordering_rows + .into_iter() + .map(|ordering_row| match ordering_row { + ScalarValue::Struct(Some(ordering_columns_per_row), _) => + Ok(ordering_columns_per_row), + _ => exec_err!( "Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}", ordering_row.data_type() - ) - } - }).collect::>>()?; + ), + }) + .collect::>>() + }) + .collect::>>()?; - partition_ordering_values.push(ordering_value); - } + partition_ordering_values.extend(orderings.iter().map(|v| v.as_slice())); - let sort_options = self - .ordering_req - .iter() - .map(|sort_expr| sort_expr.options) - .collect::>(); + let sort_options = get_sort_options(&self.ordering_req); let (new_values, new_orderings) = merge_ordered_arrays( &partition_values, &partition_ordering_values, @@ -413,11 +421,11 @@ impl<'a> PartialOrd for CustomElement<'a> { /// Inner `Vec`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match) fn merge_ordered_arrays( // We will merge values into single `Vec`. - values: &[Vec], + values: &[&[ScalarValue]], // `values` will be merged according to `ordering_values`. // Inner `Vec` can be thought as ordering information for the // each `ScalarValue` in the values`. - ordering_values: &[Vec>], + ordering_values: &[&[Vec]], // Defines according to which ordering comparisons should be done. sort_options: &[SortOptions], ) -> Result<(Vec, Vec>)> { @@ -554,8 +562,8 @@ mod tests { ]; let (merged_vals, merged_ts) = merge_ordered_arrays( - &[lhs_vals, rhs_vals], - &[lhs_orderings, rhs_orderings], + &[&lhs_vals, &rhs_vals], + &[&lhs_orderings, &rhs_orderings], &sort_options, )?; let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; @@ -621,8 +629,8 @@ mod tests { Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, ]; let (merged_vals, merged_ts) = merge_ordered_arrays( - &[lhs_vals, rhs_vals], - &[lhs_orderings, rhs_orderings], + &[&lhs_vals, &rhs_vals], + &[&lhs_orderings, &rhs_orderings], &sort_options, )?; let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index 5e2012bdbb67..9a1261f69572 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -20,16 +20,16 @@ use std::any::Any; use std::sync::Arc; -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; +use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; use crate::expressions::format_state_name; use crate::{ - reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, + reverse_order_bys, AggregateExpr, LexOrdering, LexOrderingRef, PhysicalExpr, + PhysicalSortExpr, }; use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; -use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; @@ -211,10 +211,25 @@ impl FirstValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.first = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let value = &row[0]; + let orderings = &row[1..]; + // Update when + // - no entry in the state + // - There is an earlier entry in according to requirements + if !self.is_set + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_gt() + { + self.first = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) } } @@ -227,11 +242,11 @@ impl Accumulator for FirstValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // If we have seen first value, we shouldn't update it - if !values[0].is_empty() && !self.is_set { - let row = get_row_at_idx(values, 0)?; - // Update with first value in the array. - self.update_with_new_row(&row); + if let Some(first_idx) = + get_value_idx::(values, &self.ordering_req, self.is_set)? + { + let row = get_row_at_idx(values, first_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -265,7 +280,7 @@ impl Accumulator for FirstValueAccumulator { // Update with first value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&first_row[0..is_set_idx]); + self.update_with_new_row(&first_row[0..is_set_idx])?; } } Ok(()) @@ -459,10 +474,27 @@ impl LastValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { - self.last = row[0].clone(); - self.orderings = row[1..].to_vec(); - self.is_set = true; + fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { + let value = &row[0]; + let orderings = &row[1..]; + // Update when + // - no value in the state + // - There is no specific requirement, but a new value (most recent entry in terms of execution) + // - There is a more recent entry in terms of requirement + if !self.is_set + || self.orderings.is_empty() + || compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_lt() + { + self.last = value.clone(); + self.orderings = orderings.to_vec(); + self.is_set = true; + } + Ok(()) } } @@ -475,10 +507,11 @@ impl Accumulator for LastValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if !values[0].is_empty() { - let row = get_row_at_idx(values, values[0].len() - 1)?; - // Update with last value in the array. - self.update_with_new_row(&row); + if let Some(last_idx) = + get_value_idx::(values, &self.ordering_req, self.is_set)? + { + let row = get_row_at_idx(values, last_idx)?; + self.update_with_new_row(&row)?; } Ok(()) } @@ -515,7 +548,7 @@ impl Accumulator for LastValueAccumulator { // Update with last value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&last_row[0..is_set_idx]); + self.update_with_new_row(&last_row[0..is_set_idx])?; } } Ok(()) @@ -559,12 +592,35 @@ fn convert_to_sort_cols( .collect::>() } -/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. -fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { - ordering_req - .iter() - .map(|item| item.options) - .collect::>() +/// Gets either first, or last value index inside `values` batch according to ordering requirements +/// Assumes `values` batch is ordered according to ordering_req already. +/// +/// # Parameters +/// +/// - `values`: A slice of `ArrayRef` representing the values to be processed. (Columns of record batch) +/// - `ordering_req`: A lexical ordering reference specifying the required ordering of values. +/// - `is_set`: Whether any value is stored in the state for `first value` or `last value` (At the beginning this is false.). +/// +/// # Returns +/// +/// A `Result` containing an `Option`. If successful, the `Option` holds the index of the +/// desired value. Returns `None` to indicate no existing value doesn't need to be updated. +fn get_value_idx( + values: &[ArrayRef], + ordering_req: LexOrderingRef, + is_set: bool, +) -> Result> { + let value_array_ref = &values[0]; + // Return None for empty batches or when no ordering is specified and is_set is true. + if value_array_ref.is_empty() || (is_set && FIRST && ordering_req.is_empty()) { + return Ok(None); + } + Ok(Some(if FIRST { + 0 + } else { + // LAST + value_array_ref.len() - 1 + })) } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index e5421ef5ab7e..7ba7e9d01567 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -26,7 +26,7 @@ use arrow_array::types::{ }; use arrow_array::ArrowNativeTypeOp; use arrow_buffer::ArrowNativeType; -use arrow_schema::{DataType, Field}; +use arrow_schema::{DataType, Field, SortOptions}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; use std::any::Any; @@ -205,3 +205,11 @@ pub(crate) fn ordering_fields( }) .collect() } + +/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. +pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { + ordering_req + .iter() + .map(|item| item.options) + .collect::>() +} diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 921de96252f0..cccf86517119 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -27,7 +27,7 @@ use crate::aggregates::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::windows::{get_ordered_partition_by_indices, get_window_mode}; +use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning, SendableRecordBatchStream, Statistics, @@ -38,18 +38,18 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::stats::Precision; -use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; use datafusion_physical_expr::{ aggregate::is_order_sensitive, equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, - LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, + LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use itertools::{izip, Itertools}; +use itertools::Itertools; mod group_values; mod no_grouping; @@ -267,6 +267,17 @@ impl From for SendableRecordBatchStream { } } +/// A structure representing a group of aggregate expressions where each group has different +/// ordering requirements. Aggregate groups are calculated using `get_aggregate_expr_groups` function. +/// Indices refers to the position of each aggregate expressions among all aggregate expressions (prior to grouping). +#[derive(Clone, Debug, PartialEq)] +pub struct AggregateExprGroup { + /// Aggregate expressions indices + indices: Vec, + /// Requirement + requirement: LexOrdering, +} + /// Hash aggregate execution plan #[derive(Debug)] pub struct AggregateExec { @@ -278,6 +289,8 @@ pub struct AggregateExec { aggr_expr: Vec>, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, + /// Stores aggregate groups where each group has different ordering requirement. + aggregate_groups: Vec, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause limit: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -305,159 +318,6 @@ pub struct AggregateExec { output_ordering: Option, } -/// This function returns the ordering requirement of the first non-reversible -/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves -/// as the initial requirement while calculating the finest requirement among all -/// aggregate functions. If this function returns `None`, it means there is no -/// hard ordering requirement for the aggregate functions (in terms of direction). -/// Then, we can generate two alternative requirements with opposite directions. -fn get_init_req( - aggr_expr: &[Arc], - order_by_expr: &[Option], -) -> Option { - for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) { - // If the aggregation function is a non-reversible order-sensitive function - // and there is a hard requirement, choose first such requirement: - if is_order_sensitive(aggr_expr) - && aggr_expr.reverse_expr().is_none() - && fn_reqs.is_some() - { - return fn_reqs.clone(); - } - } - None -} - -/// This function gets the finest ordering requirement among all the aggregation -/// functions. If requirements are conflicting, (i.e. we can not compute the -/// aggregations in a single [`AggregateExec`]), the function returns an error. -fn get_finest_requirement( - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - eq_properties: &EquivalenceProperties, -) -> Result> { - // First, we check if all the requirements are satisfied by the existing - // ordering. If so, we return `None` to indicate this. - let mut all_satisfied = true; - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - if eq_properties.ordering_satisfy(fn_req.as_deref().unwrap_or(&[])) { - continue; - } - if let Some(reverse) = aggr_expr.reverse_expr() { - let reverse_req = fn_req.as_ref().map(|item| reverse_order_bys(item)); - if eq_properties.ordering_satisfy(reverse_req.as_deref().unwrap_or(&[])) { - // We need to update `aggr_expr` with its reverse since only its - // reverse requirement is compatible with the existing requirements: - *aggr_expr = reverse; - *fn_req = reverse_req; - continue; - } - } - // Requirement is not satisfied: - all_satisfied = false; - } - if all_satisfied { - // All of the requirements are already satisfied. - return Ok(None); - } - let mut finest_req = get_init_req(aggr_expr, order_by_expr); - for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) { - let Some(fn_req) = fn_req else { - continue; - }; - - if let Some(finest_req) = &mut finest_req { - if let Some(finer) = eq_properties.get_finer_ordering(finest_req, fn_req) { - *finest_req = finer; - continue; - } - // If an aggregate function is reversible, analyze whether its reverse - // direction is compatible with existing requirements: - if let Some(reverse) = aggr_expr.reverse_expr() { - let fn_req_reverse = reverse_order_bys(fn_req); - if let Some(finer) = - eq_properties.get_finer_ordering(finest_req, &fn_req_reverse) - { - // We need to update `aggr_expr` with its reverse, since only its - // reverse requirement is compatible with existing requirements: - *aggr_expr = reverse; - *finest_req = finer; - *fn_req = fn_req_reverse; - continue; - } - } - // If neither of the requirements satisfy the other, this means - // requirements are conflicting. Currently, we do not support - // conflicting requirements. - return not_impl_err!( - "Conflicting ordering requirements in aggregate functions is not supported" - ); - } else { - finest_req = Some(fn_req.clone()); - } - } - Ok(finest_req) -} - -/// Calculates search_mode for the aggregation -fn get_aggregate_search_mode( - group_by: &PhysicalGroupBy, - input: &Arc, - aggr_expr: &mut [Arc], - order_by_expr: &mut [Option], - ordering_req: &mut Vec, -) -> InputOrderMode { - let groupby_exprs = group_by - .expr - .iter() - .map(|(item, _)| item.clone()) - .collect::>(); - let mut input_order_mode = InputOrderMode::Linear; - if !group_by.is_single() || groupby_exprs.is_empty() { - return input_order_mode; - } - - if let Some((should_reverse, mode)) = - get_window_mode(&groupby_exprs, ordering_req, input) - { - let all_reversible = aggr_expr - .iter() - .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some()); - if should_reverse && all_reversible { - izip!(aggr_expr.iter_mut(), order_by_expr.iter_mut()).for_each( - |(aggr, order_by)| { - if let Some(reverse) = aggr.reverse_expr() { - *aggr = reverse; - } else { - unreachable!(); - } - *order_by = order_by.as_ref().map(|ob| reverse_order_bys(ob)); - }, - ); - *ordering_req = reverse_order_bys(ordering_req); - } - input_order_mode = mode; - } - input_order_mode -} - -/// Check whether group by expression contains all of the expression inside `requirement` -// As an example Group By (c,b,a) contains all of the expressions in the `requirement`: (a ASC, b DESC) -fn group_by_contains_all_requirements( - group_by: &PhysicalGroupBy, - requirement: &LexOrdering, -) -> bool { - let physical_exprs = group_by.input_exprs(); - // When we have multiple groups (grouping set) - // since group by may be calculated on the subset of the group_by.expr() - // it is not guaranteed to have all of the requirements among group by expressions. - // Hence do the analysis: whether group by contains all requirements in the single group case. - group_by.is_single() - && requirement - .iter() - .all(|req| physical_exprs_contains(&physical_exprs, &req.expr)) -} - impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( @@ -512,61 +372,39 @@ impl AggregateExec { schema: SchemaRef, original_schema: SchemaRef, ) -> Result { - // Reset ordering requirement to `None` if aggregator is not order-sensitive - let mut order_by_expr = aggr_expr - .iter() - .map(|aggr_expr| { - let fn_reqs = aggr_expr.order_bys().map(|ordering| ordering.to_vec()); - // If - // - aggregation function is order-sensitive and - // - aggregation is performing a "first stage" calculation, and - // - at least one of the aggregate function requirement is not inside group by expression - // keep the ordering requirement as is; otherwise ignore the ordering requirement. - // In non-first stage modes, we accumulate data (using `merge_batch`) - // from different partitions (i.e. merge partial results). During - // this merge, we consider the ordering of each partial result. - // Hence, we do not need to use the ordering requirement in such - // modes as long as partial results are generated with the - // correct ordering. - fn_reqs.filter(|req| { - is_order_sensitive(aggr_expr) - && mode.is_first_stage() - && !group_by_contains_all_requirements(&group_by, req) - }) - }) - .collect::>(); - let requirement = get_finest_requirement( + let input_eq_properties = input.equivalence_properties(); + let aggregate_groups = get_aggregate_expr_groups( &mut aggr_expr, - &mut order_by_expr, - &input.equivalence_properties(), - )?; - let mut ordering_req = requirement.unwrap_or(vec![]); - let input_order_mode = get_aggregate_search_mode( &group_by, - &input, - &mut aggr_expr, - &mut order_by_expr, - &mut ordering_req, - ); + &input_eq_properties, + &mode, + )?; // Get GROUP BY expressions: let groupby_exprs = group_by.input_exprs(); // If existing ordering satisfies a prefix of the GROUP BY expressions, // prefix requirements with this section. In this case, aggregation will // work more efficiently. - let indices = get_ordered_partition_by_indices(&groupby_exprs, &input); + let indices = + get_ordered_partition_by_indices(&groupby_exprs, &input_eq_properties); let mut new_requirement = indices - .into_iter() - .map(|idx| PhysicalSortRequirement { + .iter() + .map(|&idx| PhysicalSortRequirement { expr: groupby_exprs[idx].clone(), options: None, }) .collect::>(); - // Postfix ordering requirement of the aggregation to the requirement. - let req = PhysicalSortRequirement::from_sort_exprs(&ordering_req); - new_requirement.extend(req); new_requirement = collapse_lex_req(new_requirement); + let input_order_mode = + if indices.len() == groupby_exprs.len() && !indices.is_empty() { + InputOrderMode::Sorted + } else if !indices.is_empty() { + InputOrderMode::PartiallySorted(indices) + } else { + InputOrderMode::Linear + }; + // construct a map from the input expression to the output expression of the Aggregation group by let projection_mapping = ProjectionMapping::try_new(&group_by.expr, &input.schema())?; @@ -574,9 +412,8 @@ impl AggregateExec { let required_input_ordering = (!new_requirement.is_empty()).then_some(new_requirement); - let aggregate_eqs = input - .equivalence_properties() - .project(&projection_mapping, schema.clone()); + let aggregate_eqs = + input_eq_properties.project(&projection_mapping, schema.clone()); let output_ordering = aggregate_eqs.oeq_class().output_ordering(); Ok(AggregateExec { @@ -584,6 +421,7 @@ impl AggregateExec { group_by, aggr_expr, filter_expr, + aggregate_groups, input, original_schema, schema, @@ -1026,6 +864,132 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { Arc::new(Schema::new(group_fields)) } +/// Determines the lexical ordering requirement for an aggregate expression. +/// +/// # Parameters +/// +/// - `aggr_expr`: A reference to an `Arc` representing the aggregate expression. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// +/// # Returns +/// +/// A `LexOrdering` instance indicating the lexical ordering requirement for the aggregate expression. +fn get_aggregate_expr_req( + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + agg_mode: &AggregateMode, +) -> LexOrdering { + // If + // - aggregation function is not order-sensitive and + // - aggregation is performing a "second stage" calculation, and + // - all aggregate function requirement is inside group by expression + // ignore the ordering requirement. + if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() { + return vec![]; + } + + let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec(); + + // In non-first stage modes, we accumulate data (using `merge_batch`) + // from different partitions (i.e. merge partial results). During + // this merge, we consider the ordering of each partial result. + // Hence, we do not need to use the ordering requirement in such + // modes as long as partial results are generated with the + // correct ordering. + if group_by.is_single() { + // Remove all orderings that occur in the group by. These requirements will be satisfied definitely + // (Per group each group by expression will have distinct values. Hence all requirements are satisfied). + let physical_exprs = group_by.input_exprs(); + req.retain(|sort_expr| { + !physical_exprs_contains(&physical_exprs, &sort_expr.expr) + }); + } + req +} + +/// Computes the finer ordering for between given existing ordering requirement of aggregate expression. +/// +/// # Parameters +/// +/// * `existing_req` - The existing lexical ordering that needs refinement. +/// * `aggr_expr` - A reference to an aggregate expression trait object. +/// * `group_by` - Information about the physical grouping (e.g group by expression). +/// * `eq_properties` - Equivalence properties relevant to the computation. +/// * `agg_mode` - The mode of aggregation (e.g., Partial, Final, etc.). +/// +/// # Returns +/// +/// An `Option` representing the computed finer lexical ordering, +/// or `None` if there is no finer ordering, e.g existing requirement and requirement of the +/// aggregator is incompatible. +fn finer_ordering( + existing_req: &LexOrdering, + aggr_expr: &Arc, + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Option { + let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode); + eq_properties.get_finer_ordering(existing_req, &aggr_req) +} + +/// Groups aggregate expressions based on their ordering requirements. +/// +/// # Parameters +/// +/// - `aggr_exprs`: A mutable slice of `Arc` representing the aggregate expressions to be grouped. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the physical group-by expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the mode of aggregation. +/// +/// # Returns +/// +/// A vector of `AggregateExprGroup` instances, each containing indices and ordering requirements for a group of +/// related aggregate expressions. +fn get_aggregate_expr_groups( + aggr_exprs: &mut [Arc], + group_by: &PhysicalGroupBy, + eq_properties: &EquivalenceProperties, + agg_mode: &AggregateMode, +) -> Result> { + let mut groups: Vec<(LexOrdering, Vec)> = vec![]; + for (idx, aggr_expr) in aggr_exprs.iter_mut().enumerate() { + let req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode); + let mut group_match = false; + for (key, value) in groups.iter_mut() { + if let Some(finer_ordering) = + finer_ordering(key, aggr_expr, group_by, eq_properties, agg_mode) + { + value.push(idx); + *key = finer_ordering; + group_match = true; + } else if let Some(reversed_expr) = aggr_expr.reverse_expr() { + if let Some(finer_ordering) = + finer_ordering(key, &reversed_expr, group_by, eq_properties, agg_mode) + { + *aggr_expr = reversed_expr; + value.push(idx); + *key = finer_ordering; + group_match = true; + } + } + } + if !group_match { + // there is no existing group that matches. Insert new group + groups.push((req, vec![idx])); + } + } + + Ok(groups + .into_iter() + .map(|(requirement, indices)| AggregateExprGroup { + indices, + requirement, + }) + .collect()) +} + /// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: /// * Partial: AggregateExpr::expressions @@ -1042,17 +1006,15 @@ fn aggregate_expressions( .iter() .map(|agg| { let mut result = agg.expressions().clone(); - // In partial mode, append ordering requirements to expressions' results. - // Ordering requirements are used by subsequent executors to satisfy the required - // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes. - if matches!(mode, AggregateMode::Partial) { - if let Some(ordering_req) = agg.order_bys() { - let ordering_exprs = ordering_req - .iter() - .map(|item| item.expr.clone()) - .collect::>(); - result.extend(ordering_exprs); - } + // Append ordering requirements to expressions' results. + // This way order sensitive aggregators can satisfy requirement + // themselves. + if let Some(ordering_req) = agg.order_bys() { + let ordering_exprs = ordering_req + .iter() + .map(|item| item.expr.clone()) + .collect::>(); + result.extend(ordering_exprs); } result }) @@ -1104,13 +1066,13 @@ fn create_accumulators( /// returns a vector of ArrayRefs, where each entry corresponds to either the /// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) fn finalize_aggregation( - accumulators: &[AccumulatorItem], + accumulators: &[&AccumulatorItem], mode: &AggregateMode, -) -> Result> { +) -> Result>> { match mode { AggregateMode::Partial => { // build the vector of states - let a = accumulators + accumulators .iter() .map(|accumulator| { accumulator.state().and_then(|e| { @@ -1119,8 +1081,7 @@ fn finalize_aggregation( .collect::>>() }) }) - .collect::>>()?; - Ok(a.iter().flatten().cloned().collect::>()) + .collect::>>() } AggregateMode::Final | AggregateMode::FinalPartitioned @@ -1129,8 +1090,10 @@ fn finalize_aggregation( // merge the state to the final value accumulators .iter() - .map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array())) - .collect::>>() + .map(|accumulator| { + accumulator.evaluate().and_then(|v| Ok(vec![v.to_array()?])) + }) + .collect::>>>() } } } @@ -1225,6 +1188,35 @@ pub(crate) fn evaluate_group_by( .collect()) } +/// Inserts each aggregate expression result inside aggregate groups +/// to correct indices in the output. +fn reorder_aggregate_expr_results( + aggregate_group_results: Vec>>, + aggregate_group_indices: Vec>, +) -> Vec { + // Calculate the total number of aggregate results. + let n_aggregate = aggregate_group_indices.iter().flatten().count(); + + // Initialize a result vector with empty vectors, one for each aggregate result. + let mut result = vec![vec![]; n_aggregate]; + // Process each aggregate group result and its corresponding indices. + // We combine the results and indices, flattening them for processing. + aggregate_group_results + .into_iter() + .zip(aggregate_group_indices.iter()) + .flat_map(|(group_result, group_indices)| { + // Pair each group's result with its target index. + group_indices.iter().zip(group_result) + }) + .for_each(|(&idx, aggr_state)| { + // Place each aggregate state in its correct position in the result vector. + result[idx] = aggr_state; + }); + + // Flatten the result vectors into a single vector and return. + result.into_iter().flatten().collect() +} + #[cfg(test)] mod tests { use std::any::Any; @@ -1232,9 +1224,7 @@ mod tests { use std::task::{Context, Poll}; use super::*; - use crate::aggregates::{ - get_finest_requirement, AggregateExec, AggregateMode, PhysicalGroupBy, - }; + use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; @@ -1261,7 +1251,8 @@ mod tests { lit, ApproxDistinct, Count, FirstValue, LastValue, Median, }; use datafusion_physical_expr::{ - AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr, + PhysicalSortExpr, }; use datafusion_execution::memory_pool::FairSpillPool; @@ -1279,6 +1270,19 @@ mod tests { Ok(schema) } + // Convert each tuple to PhysicalSortExpr + fn convert_to_sort_exprs( + in_data: &[(&Arc, SortOptions)], + ) -> Vec { + in_data + .iter() + .map(|(expr, options)| PhysicalSortExpr { + expr: (*expr).clone(), + options: *options, + }) + .collect::>() + } + /// some mock data to aggregates fn some_data() -> (Arc, Vec) { // define a schema. @@ -2134,7 +2138,7 @@ mod tests { eq_properties.add_equal_conditions(col_a, col_b); // Aggregate requirements are // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively - let mut order_by_exprs = vec![ + let order_by_exprs = vec![ None, Some(vec![PhysicalSortExpr { expr: col_a.clone(), @@ -2171,7 +2175,7 @@ mod tests { options: options2, }]), ]; - let common_requirement = Some(vec![ + let common_requirement = vec![ PhysicalSortExpr { expr: col_a.clone(), options: options1, @@ -2180,21 +2184,145 @@ mod tests { expr: col_c.clone(), options: options1, }, - ]); - let aggr_expr = Arc::new(FirstValue::new( - col_a.clone(), - "first1", - DataType::Int32, - vec![], - vec![], - )) as _; - let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()]; - let res = - get_finest_requirement(&mut aggr_exprs, &mut order_by_exprs, &eq_properties)?; + ]; + let mut aggr_exprs = order_by_exprs + .into_iter() + .map(|order_by_expr| { + Arc::new(FirstValue::new( + col_a.clone(), + "first1", + DataType::Int32, + order_by_expr.unwrap_or_default(), + vec![], + )) as _ + }) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(vec![]); + let res = get_aggregate_expr_groups( + &mut aggr_exprs, + &group_by, + &eq_properties, + &AggregateMode::Partial, + )?; + let res = res[0].requirement.clone(); assert_eq!(res, common_requirement); Ok(()) } + #[tokio::test] + async fn test_calc_aggregate_groups() -> Result<()> { + let test_schema = create_test_schema()?; + // Assume column a and b are aliases + // Assume also that a ASC and c DESC describe the same global ordering for the table. (Since they are ordering equivalent). + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + // This is the reverse requirement of options1 + let option_desc = SortOptions { + descending: true, + nulls_first: true, + }; + let col_a = &col("a", &test_schema)?; + let col_b = &col("b", &test_schema)?; + let col_c = &col("c", &test_schema)?; + + let test_cases = vec![ + // ------- TEST CASE 1 ----------- + ( + // Ordering requirements + vec![vec![(col_a, option_asc)]], + // expected + vec![(vec![0], vec![(col_a, option_asc)])], + ), + // ------- TEST CASE 2 ----------- + ( + // Ordering requirements + vec![vec![(col_a, option_asc)], vec![(col_a, option_asc)]], + // expected + vec![(vec![0, 1], vec![(col_a, option_asc)])], + ), + // ------- TEST CASE 3 ----------- + ( + // Ordering requirements + vec![ + vec![(col_a, option_asc), (col_b, option_asc)], + vec![(col_a, option_asc)], + ], + // expected + vec![(vec![0, 1], vec![(col_a, option_asc), (col_b, option_asc)])], + ), + // ------- TEST CASE 4 ----------- + ( + // Ordering requirements + vec![vec![(col_a, option_asc)], vec![(col_a, option_desc)]], + // expected + vec![(vec![0, 1], vec![(col_a, option_asc)])], + ), + // ------- TEST CASE 5 ----------- + ( + // Ordering requirements + vec![vec![(col_a, option_asc)], vec![(col_c, option_asc)]], + // expected + vec![ + (vec![0], vec![(col_a, option_asc)]), + (vec![1], vec![(col_c, option_asc)]), + ], + ), + // ------- TEST CASE 6 ----------- + ( + // Ordering requirements + vec![ + vec![(col_a, option_asc)], + vec![(col_c, option_asc)], + vec![(col_a, option_desc)], + ], + // expected + vec![ + (vec![0, 2], vec![(col_a, option_asc)]), + (vec![1], vec![(col_c, option_asc)]), + ], + ), + ]; + for (ordering_reqs, expected) in test_cases { + let mut aggr_exprs = ordering_reqs + .into_iter() + .map(|req| { + let req = convert_to_sort_exprs(&req); + Arc::new(FirstValue::new( + col_a.clone(), + "first1", + DataType::Int32, + req, + vec![], + )) as _ + }) + .collect::>(); + + let group_by = PhysicalGroupBy::new_single(vec![]); + + // Empty equivalence Properties + let eq_properties = EquivalenceProperties::new(test_schema.clone()); + + let res = get_aggregate_expr_groups( + &mut aggr_exprs, + &group_by, + &eq_properties, + &AggregateMode::Partial, + )?; + + let expected = expected + .into_iter() + .map(|(indices, req)| AggregateExprGroup { + indices, + requirement: convert_to_sort_exprs(&req), + }) + .collect::>(); + assert_eq!(res, expected); + } + Ok(()) + } + #[test] fn test_agg_exec_same_schema() -> Result<()> { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 90eb488a2ead..be379a6fa374 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -18,27 +18,62 @@ //! Aggregate without grouping columns use crate::aggregates::{ - aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem, - AggregateMode, + aggregate_expressions, create_accumulators, finalize_aggregation, + reorder_aggregate_expr_results, AccumulatorItem, AggregateExprGroup, AggregateMode, }; use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow_array::ArrayRef; use datafusion_common::Result; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; use futures::stream::BoxStream; use std::borrow::Cow; use std::sync::Arc; use std::task::{Context, Poll}; use crate::filter::batch_filter; +use crate::sorts::sort::sort_batch; +use datafusion_common::utils::get_at_indices; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use futures::stream::{Stream, StreamExt}; +use itertools::izip; use super::AggregateExec; +/// A structure storing necessary data for aggregate expr evaluation. +/// +/// # Fields +/// +/// - `expressions`: A vector expressions that aggregate expression refers e.g for CORR(a, b) this will be a, b. +/// - `filter_expression`: A vector of optional filter expression associated with aggregate expression. +/// - `accumulator`: The accumulator used to calculate aggregate expression result. +pub struct AggregateExprData { + expressions: Vec>, + filter_expression: Option>, + accumulator: AccumulatorItem, +} + +/// A structure representing an aggregate group. +/// +/// The `AggregateGroup` struct is all aggregate expressions +/// where ordering requirement is satisfied by `requirement`. +/// This struct divides aggregate expressions according to their requirements. +/// Aggregate groups are constructed using `get_aggregate_expr_groups` function. +/// +/// # Fields +/// +/// - `aggregates`: A vector of `AggregateExprData` which stores necessary fields for successful evaluation of the each aggregate expression. +/// - `requirement`: A `LexOrdering` instance specifying the lexical ordering requirement of the group. +/// - `group_indices`: A vector of indices indicating position of each aggregation in the original aggregate expression. +pub struct AggregateGroup { + aggregates: Vec, + requirement: LexOrdering, + group_indices: Vec, +} + /// stream struct for aggregation without grouping columns pub(crate) struct AggregateStream { stream: BoxStream<'static, Result>, @@ -57,9 +92,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - aggregate_expressions: Vec>>, - filter_expressions: Vec>>, - accumulators: Vec, + aggregate_groups: Vec, reservation: MemoryReservation, finished: bool, } @@ -67,103 +100,170 @@ struct AggregateStreamInner { impl AggregateStream { /// Create a new AggregateStream pub fn new( - agg: &AggregateExec, + aggregate_exec: &AggregateExec, context: Arc, partition: usize, ) -> Result { - let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_schema = Arc::clone(&aggregate_exec.schema); + let agg_filter_expr = aggregate_exec.filter_expr.clone(); - let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); - let input = agg.input.execute(partition, Arc::clone(&context))?; + let baseline_metrics = BaselineMetrics::new(&aggregate_exec.metrics, partition); + let input = aggregate_exec + .input + .execute(partition, Arc::clone(&context))?; - let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; - let filter_expressions = match agg.mode { + let aggregate_expressions = + aggregate_expressions(&aggregate_exec.aggr_expr, &aggregate_exec.mode, 0)?; + let filter_expressions = match aggregate_exec.mode { AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; aggregate_exec.aggr_expr.len()] } }; - let accumulators = create_accumulators(&agg.aggr_expr)?; let reservation = MemoryConsumer::new(format!("AggregateStream[{partition}]")) .register(context.memory_pool()); + let aggregate_groups = aggregate_exec + .aggregate_groups + .iter() + .map( + |AggregateExprGroup { + indices, + requirement, + }| { + let aggr_exprs = get_at_indices(&aggregate_exec.aggr_expr, indices)?; + let aggregate_expressions = + get_at_indices(&aggregate_expressions, indices)?; + let filter_expressions = + get_at_indices(&filter_expressions, indices)?; + let accumulators = create_accumulators(&aggr_exprs)?; + let aggregates = izip!( + aggregate_expressions.into_iter(), + filter_expressions.into_iter(), + accumulators.into_iter() + ) + .map(|(expressions, filter_expression, accumulator)| { + AggregateExprData { + expressions, + filter_expression, + accumulator, + } + }) + .collect::>(); + Ok(AggregateGroup { + aggregates, + requirement: requirement.to_vec(), + group_indices: indices.to_vec(), + }) + }, + ) + .collect::>>()?; - let inner = AggregateStreamInner { - schema: Arc::clone(&agg.schema), - mode: agg.mode, + let stream = create_aggregate_stream( + aggregate_exec, input, baseline_metrics, - aggregate_expressions, - filter_expressions, - accumulators, + aggregate_groups, reservation, - finished: false, - }; - let stream = futures::stream::unfold(inner, |mut this| async move { - if this.finished { - return None; - } + )?; - let elapsed_compute = this.baseline_metrics.elapsed_compute(); - - loop { - let result = match this.input.next().await { - Some(Ok(batch)) => { - let timer = elapsed_compute.timer(); - let result = aggregate_batch( - &this.mode, - batch, - &mut this.accumulators, - &this.aggregate_expressions, - &this.filter_expressions, - ); - - timer.done(); - - // allocate memory - // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with - // overshooting a bit. Also this means we either store the whole record batch or not. - match result - .and_then(|allocated| this.reservation.try_grow(allocated)) - { - Ok(_) => continue, - Err(e) => Err(e), - } + Ok(Self { + schema: agg_schema, + stream, + }) + } +} + +/// Creates a stream for processing aggregate expressions. +/// +/// This function constructs a stream that processes batches from `input`, aggregates +/// them according to `aggregate_groups`, and finally yields the aggregated results. +/// It handles the aggregation logic depending on the aggregate mode defined in `agg`. +/// The function also accounts for memory consumption during aggregation using `reservation`. +/// +/// # Parameters +/// - `aggregate_exec`: Reference to the `AggregateExec` struct which contains the aggregate execution plan. +/// - `input`: Stream of `RecordBatch` items representing the input data. +/// - `baseline_metrics`: Metrics for tracking the performance and resource usage. +/// - `aggregate_groups`: A vector of `AggregateGroup` structs, each representing a group of +/// aggregate expressions along with their corresponding indices and ordering requirements. +/// - `reservation`: Memory reservation handle for managing memory consumption during aggregation. +/// +/// # Returns +/// A `Result` containing the constructed stream if successful, or an error if the stream +/// creation fails. The stream yields `RecordBatch` items, each representing a batch of aggregated results. +fn create_aggregate_stream( + aggregate_exec: &AggregateExec, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + aggregate_groups: Vec, + reservation: MemoryReservation, +) -> Result>> { + let inner = AggregateStreamInner { + schema: Arc::clone(&aggregate_exec.schema), + mode: aggregate_exec.mode, + input, + baseline_metrics, + aggregate_groups, + reservation, + finished: false, + }; + let stream = futures::stream::unfold(inner, |mut this| async move { + if this.finished { + return None; + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match this.input.next().await { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = aggregate_batch_groups( + &this.mode, + batch, + &mut this.aggregate_groups, + ); + + timer.done(); + + // allocate memory + // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with + // overshooting a bit. Also this means we either store the whole record batch or not. + match result + .and_then(|allocated| this.reservation.try_grow(allocated)) + { + Ok(_) => continue, + Err(e) => Err(e), } - Some(Err(e)) => Err(e), - None => { - this.finished = true; - let timer = this.baseline_metrics.elapsed_compute().timer(); - let result = finalize_aggregation(&this.accumulators, &this.mode) + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = + finalize_aggregation_groups(&this.aggregate_groups, &this.mode) .and_then(|columns| { RecordBatch::try_new(this.schema.clone(), columns) .map_err(Into::into) }) .record_output(&this.baseline_metrics); - timer.done(); - - result - } - }; + timer.done(); - this.finished = true; - return Some((result, this)); - } - }); + result + } + }; - // seems like some consumers call this stream even after it returned `None`, so let's fuse the stream. - let stream = stream.fuse(); - let stream = Box::pin(stream); + this.finished = true; + return Some((result, this)); + } + }); - Ok(Self { - schema: agg_schema, - stream, - }) - } + // seems like some consumers call this stream even after it returned `None`, so let's fuse the stream. + Ok(Box::pin(stream.fuse())) } impl Stream for AggregateStream { @@ -184,17 +284,30 @@ impl RecordBatchStream for AggregateStream { } } -/// Perform group-by aggregation for the given [`RecordBatch`]. +/// Perform group-by aggregation for the given [`RecordBatch`] on all aggregate groups. +/// +/// If successful, this returns the additional number of bytes that were allocated during this process. +fn aggregate_batch_groups( + mode: &AggregateMode, + batch: RecordBatch, + aggregate_groups: &mut [AggregateGroup], +) -> Result { + let allocated = aggregate_groups + .iter_mut() + .map(|aggregate_group| aggregate_batch(mode, &batch, aggregate_group)) + .collect::>>()?; + Ok(allocated.into_iter().sum()) +} + +/// Perform group-by aggregation for the given [`RecordBatch`] on the aggregate group. /// /// If successful, this returns the additional number of bytes that were allocated during this process. /// /// TODO: Make this a member function fn aggregate_batch( mode: &AggregateMode, - batch: RecordBatch, - accumulators: &mut [AccumulatorItem], - expressions: &[Vec>], - filters: &[Option>], + batch: &RecordBatch, + aggregate_group: &mut AggregateGroup, ) -> Result { let mut allocated = 0usize; @@ -203,40 +316,78 @@ fn aggregate_batch( // 1.3 evaluate expressions // 1.4 update / merge accumulators with the expressions' values + let requirement = &aggregate_group.requirement; + let sorted_or_original_batch = if requirement.is_empty() { + Cow::Borrowed(batch) + } else { + Cow::Owned(sort_batch(batch, requirement, None)?) + }; // 1.1 - accumulators - .iter_mut() - .zip(expressions) - .zip(filters) - .try_for_each(|((accum, expr), filter)| { + aggregate_group.aggregates.iter_mut().try_for_each( + |AggregateExprData { + expressions, + filter_expression, + accumulator, + }| { // 1.2 - let batch = match filter { - Some(filter) => Cow::Owned(batch_filter(&batch, filter)?), - None => Cow::Borrowed(&batch), + let filtered_or_original_batch = match filter_expression { + Some(filter) => { + Cow::Owned(batch_filter(&sorted_or_original_batch, filter)?) + } + None => Cow::Borrowed(&*sorted_or_original_batch), }; // 1.3 - let values = &expr + let values = &expressions .iter() .map(|e| { - e.evaluate(&batch) - .and_then(|v| v.into_array(batch.num_rows())) + e.evaluate(&filtered_or_original_batch) + .and_then(|v| v.into_array(filtered_or_original_batch.num_rows())) }) .collect::>>()?; // 1.4 - let size_pre = accum.size(); + let size_pre = accumulator.size(); let res = match mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => accum.update_batch(values), + | AggregateMode::SinglePartitioned => accumulator.update_batch(values), AggregateMode::Final | AggregateMode::FinalPartitioned => { - accum.merge_batch(values) + accumulator.merge_batch(values) } }; - let size_post = accum.size(); + let size_post = accumulator.size(); allocated += size_post.saturating_sub(size_pre); res - })?; + }, + )?; Ok(allocated) } + +/// returns a vector of ArrayRefs, where each entry corresponds to either the +/// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) +fn finalize_aggregation_groups( + aggregate_groups: &[AggregateGroup], + mode: &AggregateMode, +) -> Result> { + let aggregate_group_results = aggregate_groups + .iter() + .map(|aggregate_group| { + let accumulators = aggregate_group + .aggregates + .iter() + .map(|elem| &elem.accumulator) + .collect::>(); + finalize_aggregation(&accumulators, mode) + }) + .collect::>>()?; + let aggregate_group_indices = aggregate_groups + .iter() + .map(|aggregate_group| aggregate_group.group_indices.to_vec()) + .collect::>(); + + Ok(reorder_aggregate_expr_results( + aggregate_group_results, + aggregate_group_indices, + )) +} diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 89614fd3020c..0ba1fa83316e 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -24,8 +24,8 @@ use std::vec; use crate::aggregates::group_values::{new_group_values, GroupValues}; use crate::aggregates::order::GroupOrderingFull; use crate::aggregates::{ - evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode, - PhysicalGroupBy, + evaluate_group_by, evaluate_many, evaluate_optional, group_schema, + reorder_aggregate_expr_results, AggregateExprGroup, AggregateMode, PhysicalGroupBy, }; use crate::common::IPCWriter; use crate::metrics::{BaselineMetrics, RecordOutput}; @@ -46,9 +46,11 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ - AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, PhysicalSortExpr, + AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, LexOrdering, + PhysicalSortExpr, }; +use datafusion_common::utils::get_at_indices; use futures::ready; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -81,13 +83,58 @@ struct SpillState { /// true when streaming merge is in progress is_stream_merging: bool, - /// aggregate_arguments for merging spilled data - merging_aggregate_arguments: Vec>>, - /// GROUP BY expressions for merging spilled data merging_group_by: PhysicalGroupBy, } +pub struct HashAggregateGroup { + /// Accumulators, one for each `AggregateExpr` in the query + /// + /// For example, if the query has aggregates, `SUM(x)`, + /// `COUNT(y)`, there will be two accumulators, each one + /// specialized for that particular aggregate and its input types + accumulators: Vec>, + + /// Arguments to pass to each accumulator. + /// + /// The arguments in `accumulator[i]` is passed `aggregate_arguments[i]` + /// + /// The argument to each accumulator is itself a `Vec` because + /// some aggregates such as `CORR` can accept more than one + /// argument. + aggregate_arguments: Vec>>, + + /// aggregate_arguments at the input of the merging (may have additional fields) + merging_aggregate_arguments: Vec>>, + + /// Optional filter expression to evaluate, one for each for + /// accumulator. If present, only those rows for which the filter + /// evaluate to true should be included in the aggregate results. + /// + /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, + /// the filter expression is `x > 100`. + filter_expressions: Vec>>, + /// Ordering Requirement for the aggregate group + requirement: LexOrdering, + /// Indices stores the position of the each aggregate expression + /// among all aggregate expressions (prior to grouping). + group_indices: Vec, +} + +impl HashAggregateGroup { + /// Initialize an empty group. + fn empty() -> Self { + HashAggregateGroup { + accumulators: vec![], + aggregate_arguments: vec![], + merging_aggregate_arguments: vec![], + filter_expressions: vec![], + requirement: vec![], + group_indices: vec![], + } + } +} + /// HashTable based Grouping Aggregator /// /// # Design Goals @@ -208,30 +255,9 @@ pub(crate) struct GroupedHashAggregateStream { input: SendableRecordBatchStream, mode: AggregateMode, - /// Accumulators, one for each `AggregateExpr` in the query - /// - /// For example, if the query has aggregates, `SUM(x)`, - /// `COUNT(y)`, there will be two accumulators, each one - /// specialized for that particular aggregate and its input types - accumulators: Vec>, - - /// Arguments to pass to each accumulator. - /// - /// The arguments in `accumulator[i]` is passed `aggregate_arguments[i]` - /// - /// The argument to each accumulator is itself a `Vec` because - /// some aggregates such as `CORR` can accept more than one - /// argument. - aggregate_arguments: Vec>>, - - /// Optional filter expression to evaluate, one for each for - /// accumulator. If present, only those rows for which the filter - /// evaluate to true should be included in the aggregate results. - /// - /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, - /// the filter expression is `x > 100`. - filter_expressions: Vec>>, - + /// Stores aggregate groups that have different ordering requirements + /// Aggregate groups are calculated using `get_aggregate_expr_groups` function. + aggregate_groups: Vec, /// GROUP BY expressions group_by: PhysicalGroupBy, @@ -293,8 +319,6 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); - // arguments for each aggregate, one vec of expressions per // aggregate let aggregate_arguments = aggregates::aggregate_expressions( @@ -318,11 +342,47 @@ impl GroupedHashAggregateStream { } }; - // Instantiate the accumulators - let accumulators: Vec<_> = aggregate_exprs + let mut aggregate_groups = agg + .aggregate_groups .iter() - .map(create_group_accumulator) - .collect::>()?; + .map( + |AggregateExprGroup { + indices, + requirement, + }| { + let aggregate_arguments = + get_at_indices(&aggregate_arguments, indices)?; + let merging_aggregate_arguments = + get_at_indices(&merging_aggregate_arguments, indices)?; + let filter_expressions = + get_at_indices(&filter_expressions, indices)?; + let aggr_exprs = get_at_indices(&agg.aggr_expr, indices)?; + // Instantiate the accumulators + let accumulators: Vec<_> = aggr_exprs + .iter() + .map(create_group_accumulator) + .collect::>()?; + + Ok(HashAggregateGroup { + accumulators, + aggregate_arguments, + merging_aggregate_arguments, + filter_expressions, + requirement: requirement.to_vec(), + group_indices: indices.to_vec(), + }) + }, + ) + .collect::>>()?; + if aggregate_groups.is_empty() { + // Make sure there is at least one empty group. + // This situations can arise in queries in following form + // SELECT a, b + // FROM table + // GROUP BY a,b + // where there is no aggregate expression. + aggregate_groups = vec![HashAggregateGroup::empty()] + } // we need to use original schema so RowConverter in group_values below // will do the proper coversion of dictionaries into value types @@ -360,7 +420,6 @@ impl GroupedHashAggregateStream { spill_expr, spill_schema: agg_schema.clone(), is_stream_merging: false, - merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; @@ -368,9 +427,7 @@ impl GroupedHashAggregateStream { schema: agg_schema, input, mode: agg.mode, - accumulators, - aggregate_arguments, - filter_expressions, + aggregate_groups, group_by: agg_group_by, reservation, group_values, @@ -522,79 +579,87 @@ impl RecordBatchStream for GroupedHashAggregateStream { impl GroupedHashAggregateStream { /// Perform group-by aggregation for the given [`RecordBatch`]. fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { - // Evaluate the grouping expressions - let group_by_values = if self.spill_state.is_stream_merging { - evaluate_group_by(&self.spill_state.merging_group_by, &batch)? - } else { - evaluate_group_by(&self.group_by, &batch)? - }; - - // Evaluate the aggregation expressions. - let input_values = if self.spill_state.is_stream_merging { - evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)? - } else { - evaluate_many(&self.aggregate_arguments, &batch)? - }; - - // Evaluate the filter expressions, if any, against the inputs - let filter_values = if self.spill_state.is_stream_merging { - let filter_expressions = vec![None; self.accumulators.len()]; - evaluate_optional(&filter_expressions, &batch)? - } else { - evaluate_optional(&self.filter_expressions, &batch)? - }; - - for group_values in &group_by_values { - // calculate the group indices for each input row - let starting_num_groups = self.group_values.len(); - self.group_values - .intern(group_values, &mut self.current_group_indices)?; - let group_indices = &self.current_group_indices; - - // Update ordering information if necessary - let total_num_groups = self.group_values.len(); - if total_num_groups > starting_num_groups { - self.group_ordering.new_groups( - group_values, - group_indices, - total_num_groups, - )?; - } + for aggregate_group in &mut self.aggregate_groups { + let batch = if aggregate_group.requirement.is_empty() { + batch.clone() + } else { + sort_batch(&batch, &aggregate_group.requirement, None)? + }; + // Evaluate the grouping expressions + let group_by_values = if self.spill_state.is_stream_merging { + evaluate_group_by(&self.spill_state.merging_group_by, &batch)? + } else { + evaluate_group_by(&self.group_by, &batch)? + }; + + // Evaluate the aggregation expressions. + let input_values = if self.spill_state.is_stream_merging { + evaluate_many(&aggregate_group.merging_aggregate_arguments, &batch)? + } else { + evaluate_many(&aggregate_group.aggregate_arguments, &batch)? + }; + + // Evaluate the filter expressions, if any, against the inputs + let filter_values = if self.spill_state.is_stream_merging { + let filter_expressions = vec![None; aggregate_group.accumulators.len()]; + evaluate_optional(&filter_expressions, &batch)? + } else { + evaluate_optional(&aggregate_group.filter_expressions, &batch)? + }; + + for group_values in &group_by_values { + // calculate the group indices for each input row + let starting_num_groups = self.group_values.len(); + self.group_values + .intern(group_values, &mut self.current_group_indices)?; + let group_indices = &self.current_group_indices; + + // Update ordering information if necessary + let total_num_groups = self.group_values.len(); + if total_num_groups > starting_num_groups { + self.group_ordering.new_groups( + group_values, + group_indices, + total_num_groups, + )?; + } - // Gather the inputs to call the actual accumulator - let t = self - .accumulators - .iter_mut() - .zip(input_values.iter()) - .zip(filter_values.iter()); - - for ((acc, values), opt_filter) in t { - let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); - - // Call the appropriate method on each aggregator with - // the entire input row and the relevant group indexes - match self.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned - if !self.spill_state.is_stream_merging => - { - acc.update_batch( - values, - group_indices, - opt_filter, - total_num_groups, - )?; - } - _ => { - // if aggregation is over intermediate states, - // use merge - acc.merge_batch( - values, - group_indices, - opt_filter, - total_num_groups, - )?; + // Gather the inputs to call the actual accumulator + let t = aggregate_group + .accumulators + .iter_mut() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in t { + let opt_filter = + opt_filter.as_ref().map(|filter| filter.as_boolean()); + + // Call the appropriate method on each aggregator with + // the entire input row and the relevant group indexes + match self.mode { + AggregateMode::Partial + | AggregateMode::Single + | AggregateMode::SinglePartitioned + if !self.spill_state.is_stream_merging => + { + acc.update_batch( + values, + group_indices, + opt_filter, + total_num_groups, + )?; + } + _ => { + // if aggregation is over intermediate states, + // use merge + acc.merge_batch( + values, + group_indices, + opt_filter, + total_num_groups, + )?; + } } } } @@ -613,12 +678,19 @@ impl GroupedHashAggregateStream { } fn update_memory_reservation(&mut self) -> Result<()> { - let acc = self.accumulators.iter().map(|x| x.size()).sum::(); - self.reservation.try_resize( - acc + self.group_values.size() - + self.group_ordering.size() - + self.current_group_indices.allocated_size(), - ) + for aggregate_group in &self.aggregate_groups { + let acc = aggregate_group + .accumulators + .iter() + .map(|x| x.size()) + .sum::(); + self.reservation.try_resize( + acc + self.group_values.size() + + self.group_ordering.size() + + self.current_group_indices.allocated_size(), + )?; + } + Ok(()) } /// Create an output RecordBatch with the group keys and @@ -639,21 +711,39 @@ impl GroupedHashAggregateStream { } // Next output each aggregate value - for acc in self.accumulators.iter_mut() { - match self.mode { - AggregateMode::Partial => output.extend(acc.state(emit_to)?), - _ if spilling => { - // If spilling, output partial state because the spilled data will be - // merged and re-evaluated later. - output.extend(acc.state(emit_to)?) + let outputs = self + .aggregate_groups + .iter_mut() + .map(|aggregate_group| { + let mut aggregate_group_output = vec![]; + for acc in aggregate_group.accumulators.iter_mut() { + match self.mode { + AggregateMode::Partial => { + aggregate_group_output.push(acc.state(emit_to)?) + } + _ if spilling => { + // If spilling, output partial state because the spilled data will be + // merged and re-evaluated later. + aggregate_group_output.push(acc.state(emit_to)?) + } + AggregateMode::Final + | AggregateMode::FinalPartitioned + | AggregateMode::Single + | AggregateMode::SinglePartitioned => { + aggregate_group_output.push(vec![acc.evaluate(emit_to)?]) + } + } } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), - } - } - + Ok(aggregate_group_output) + }) + .collect::>>()?; + let group_indices = self + .aggregate_groups + .iter() + .map(|aggregate_group| aggregate_group.group_indices.to_vec()) + .collect::>(); + let aggregate_outputs = reorder_aggregate_expr_results(outputs, group_indices); + output.extend(aggregate_outputs); // emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is // over the target memory size after emission, we can emit again rather than returning Err. let _ = self.update_memory_reservation(); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 431a43bc6055..ef4ea92f7e46 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -108,7 +108,7 @@ impl BoundedWindowAggExec { InputOrderMode::Sorted => { let indices = get_ordered_partition_by_indices( window_expr[0].partition_by(), - &input, + &input.equivalence_properties(), ); if indices.len() == partition_by_exprs.len() { indices diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 3187e6b0fbd3..38bce539a8f7 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -319,11 +319,9 @@ pub(crate) fn calc_requirements< // resulting vector (a, b) is a preset of the existing ordering (a, b, c). pub(crate) fn get_ordered_partition_by_indices( partition_by_exprs: &[Arc], - input: &Arc, + input_eq_properties: &EquivalenceProperties, ) -> Vec { - let (_, indices) = input - .equivalence_properties() - .find_longest_permutation(partition_by_exprs); + let (_, indices) = input_eq_properties.find_longest_permutation(partition_by_exprs); indices } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 6c245f65ba4f..a9efe4a3c0c8 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -80,8 +80,10 @@ impl WindowAggExec { let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); - let ordered_partition_by_indices = - get_ordered_partition_by_indices(window_expr[0].partition_by(), &input); + let ordered_partition_by_indices = get_ordered_partition_by_indices( + window_expr[0].partition_by(), + &input.equivalence_properties(), + ); Ok(Self { input, window_expr, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 78575c9dffc5..4a5a61147ee8 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -132,9 +132,8 @@ physical_plan AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] --CoalescePartitionsExec ----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] -------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST] ---------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index 9a7117b69b99..e33b3efe3a52 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -46,7 +46,6 @@ c 2 d 1 e 3 -# Basic example + reverse order of the selected column query TI SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3 DESC, c9; ---- @@ -100,10 +99,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes ------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted ---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true +------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true # ON expressions are not a sub-set of the ORDER BY expressions query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index f1b6a57287b5..82f0d50da561 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2019,17 +2019,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0]) ---------------SortExec: expr=[col0@3 ASC NULLS LAST] -----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] -------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] +------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] +--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] +----------------CoalesceBatchesExec: target_batch_size=8192 +------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2297,8 +2296,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] -----SortExec: expr=[amount@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query T? @@ -2327,8 +2325,7 @@ Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)] -----SortExec: expr=[amount@1 DESC] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query T?R rowsort SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, @@ -2340,15 +2337,37 @@ FRA [200.0, 50.0] 250 GRC [80.0, 30.0] 110 TUR [100.0, 75.0] 175 -# test_ordering_sensitive_aggregation3 -# When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation. -# test below should raise Plan Error. -statement error DataFusion error: This feature is not implemented: Conflicting ordering requirements in aggregate functions is not supported -SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, +query TT +EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, + ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2, + ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3 + FROM sales_global AS s + GROUP BY s.country + ORDER BY s.country +---- +logical_plan +Sort: s.country ASC NULLS LAST +--Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC NULLS LAST] AS amounts2, ARRAY_AGG(s.amount) ORDER BY [s.sn ASC NULLS LAST] AS amounts3 +----Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], ARRAY_AGG(s.amount) ORDER BY [s.amount ASC NULLS LAST], ARRAY_AGG(s.amount) ORDER BY [s.sn ASC NULLS LAST]]] +------SubqueryAlias: s +--------TableScan: sales_global projection=[country, sn, amount] +physical_plan +SortExec: expr=[country@0 ASC NULLS LAST] +--ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC NULLS LAST]@2 as amounts2, ARRAY_AGG(s.amount) ORDER BY [s.sn ASC NULLS LAST]@3 as amounts3] +----AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), ARRAY_AGG(s.amount), ARRAY_AGG(s.amount)] +------MemoryExec: partitions=1, partition_sizes=[1] + +query T??? rowsort +SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2, ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3 FROM sales_global AS s GROUP BY s.country + ORDER BY s.country +---- +FRA [200.0, 50.0] [50.0, 200.0] [50.0, 200.0] +GRC [80.0, 30.0] [30.0, 80.0] [30.0, 80.0] +TUR [100.0, 75.0] [75.0, 100.0] [75.0, 100.0] # test_ordering_sensitive_aggregation4 # If aggregators can work with bounded memory (Sorted or PartiallySorted mode), we should append requirement to @@ -2371,7 +2390,7 @@ Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted -----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC] +----SortExec: expr=[country@0 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] query T?R rowsort @@ -2407,7 +2426,7 @@ Projection: s.country, s.zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC N physical_plan ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, SUM(s.amount)@3 as sum1] --AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=PartiallySorted([0]) -----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC] +----SortExec: expr=[country@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] query TI?R rowsort @@ -2478,7 +2497,7 @@ Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted -----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC] +----SortExec: expr=[country@0 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] query T?R rowsort @@ -2510,8 +2529,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] -----SortExec: expr=[amount@1 DESC] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query T?RR rowsort SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, @@ -2541,8 +2559,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] -----SortExec: expr=[amount@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query T?RR SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts, @@ -2573,8 +2590,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] -----SortExec: expr=[amount@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query TRR? SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1, @@ -2603,8 +2619,7 @@ Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_globa physical_plan ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)] -----SortExec: expr=[amount@2 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query TR? SELECT country, SUM(amount ORDER BY ts DESC) AS sum1, @@ -2637,9 +2652,8 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ------TableScan: sales_global projection=[country, ts, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 ASC NULLS LAST] -------MemoryExec: partitions=1, partition_sizes=[1] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2672,8 +2686,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] --AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] -----SortExec: expr=[ts@1 DESC] -------MemoryExec: partitions=1, partition_sizes=[1] +----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1, @@ -2709,12 +2722,11 @@ physical_plan SortExec: expr=[sn@2 ASC NULLS LAST] --ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate] ----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)] -------SortExec: expr=[sn@5 ASC NULLS LAST] ---------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] -----------CoalesceBatchesExec: target_batch_size=8192 -------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 ---------------MemoryExec: partitions=1, partition_sizes=[1] ---------------MemoryExec: partitions=1, partition_sizes=[1] +------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] +--------CoalesceBatchesExec: target_batch_size=8192 +----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 +------------MemoryExec: partitions=1, partition_sizes=[1] +------------MemoryExec: partitions=1, partition_sizes=[1] query ITIPTR rowsort SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate @@ -2759,8 +2771,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 --------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2796,8 +2807,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 --------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------MemoryExec: partitions=1, partition_sizes=[1] +----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2831,9 +2841,8 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts --AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2858,9 +2867,8 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts --AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query RR SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -2883,9 +2891,8 @@ ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts A --AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ---------SortExec: expr=[ts@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query ? SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS array_agg1 @@ -2907,9 +2914,8 @@ ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts D --AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ---------SortExec: expr=[ts@0 DESC] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query ? SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS array_agg1 @@ -2931,9 +2937,8 @@ ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amou --AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ----CoalescePartitionsExec ------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] ---------SortExec: expr=[amount@0 ASC NULLS LAST] -----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query ? SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1 @@ -2961,9 +2966,8 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] --------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] ---------------SortExec: expr=[amount@1 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------------MemoryExec: partitions=1, partition_sizes=[1] +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] query T? SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1 @@ -2997,9 +3001,8 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] --------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------------SortExec: expr=[amount@1 DESC] -----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -------------------MemoryExec: partitions=1, partition_sizes=[1] +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] query T?RR SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, @@ -3639,10 +3642,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_tab ----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] ---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 ---------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true @@ -4250,7 +4253,6 @@ set datafusion.sql_parser.dialect = 'Generic'; statement ok drop table aggregate_test_100; - # Create an unbounded external table with primary key # column c statement ok @@ -4281,6 +4283,236 @@ LIMIT 5 3 0 0 4 0 1 +statement ok +set datafusion.optimizer.prefer_existing_sort = true; + +query TT +EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Aggregate: groupBy=[[multiple_ordered_table.a, multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.d DESC NULLS FIRST]]] +----TableScan: multiple_ordered_table projection=[a, b, c, d] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST +--------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true + +statement ok +set datafusion.execution.target_partitions = 1; + +query II?? rowsort +SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 [4, 0, 0, 3, 0, 4, 1, 2, 3, 3, 2, 1, 2, 2, 4, 4, 1, 2, 0, 1, 1, 0, 0, 2, 0] [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 [3, 1, 1, 0, 0, 3, 2, 3, 3, 1, 1, 0, 4, 1, 0, 4, 2, 2, 4, 3, 1, 1, 0, 2, 0] [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 [4, 1, 1, 2, 0, 1, 1, 4, 2, 0, 4, 4, 1, 3, 4, 2, 1, 1, 1, 2, 4, 1, 1, 3, 0] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 [3, 4, 2, 4, 0, 2, 0, 0, 2, 1, 4, 3, 4, 1, 1, 1, 2, 4, 0, 2, 4, 1, 2, 0, 2] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +statement ok +set datafusion.execution.target_partitions = 8; + +query II?? rowsort +SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 [4, 0, 0, 3, 0, 4, 1, 2, 3, 3, 2, 1, 2, 2, 4, 4, 1, 2, 0, 1, 1, 0, 0, 2, 0] [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 [3, 1, 1, 0, 0, 3, 2, 3, 3, 1, 1, 0, 4, 1, 0, 4, 2, 2, 4, 3, 1, 1, 0, 2, 0] [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 [4, 1, 1, 2, 0, 1, 1, 4, 2, 0, 4, 4, 1, 3, 4, 2, 1, 1, 1, 2, 4, 1, 1, 3, 0] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 [3, 4, 2, 4, 0, 2, 0, 0, 2, 1, 4, 3, 4, 1, 1, 1, 2, 4, 0, 2, 4, 1, 2, 0, 2] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +query TT +EXPLAIN SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Aggregate: groupBy=[[multiple_ordered_table.a, multiple_ordered_table.b]], aggr=[[FIRST_VALUE(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], LAST_VALUE(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.d DESC NULLS FIRST]]] +----TableScan: multiple_ordered_table projection=[a, b, c, d] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(multiple_ordered_table.d), LAST_VALUE(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST +--------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(multiple_ordered_table.d), LAST_VALUE(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true + +statement ok +set datafusion.execution.target_partitions = 1; + +query IIII? rowsort +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 0 [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 3 0 [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 4 0 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 3 2 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +statement ok +set datafusion.execution.target_partitions = 8; + +query IIII? +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 0 [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 3 0 [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 4 0 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 3 2 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +# Test same queries where there is sort inserted (source ordering is not preserved) +statement ok +set datafusion.optimizer.prefer_existing_sort = false; + +query TT +EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Aggregate: groupBy=[[multiple_ordered_table.a, multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.d DESC NULLS FIRST]]] +----TableScan: multiple_ordered_table projection=[a, b, c, d] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8 +----------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true + +statement ok +set datafusion.execution.target_partitions = 1; + +query II?? rowsort +SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 [4, 0, 0, 3, 0, 4, 1, 2, 3, 3, 2, 1, 2, 2, 4, 4, 1, 2, 0, 1, 1, 0, 0, 2, 0] [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 [3, 1, 1, 0, 0, 3, 2, 3, 3, 1, 1, 0, 4, 1, 0, 4, 2, 2, 4, 3, 1, 1, 0, 2, 0] [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 [4, 1, 1, 2, 0, 1, 1, 4, 2, 0, 4, 4, 1, 3, 4, 2, 1, 1, 1, 2, 4, 1, 1, 3, 0] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 [3, 4, 2, 4, 0, 2, 0, 0, 2, 1, 4, 3, 4, 1, 1, 1, 2, 4, 0, 2, 4, 1, 2, 0, 2] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +statement ok +set datafusion.execution.target_partitions = 8; + +query II?? +SELECT a, b, ARRAY_AGG(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 [4, 0, 0, 3, 0, 4, 1, 2, 3, 3, 2, 1, 2, 2, 4, 4, 1, 2, 0, 1, 1, 0, 0, 2, 0] [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 [3, 1, 1, 0, 0, 3, 2, 3, 3, 1, 1, 0, 4, 1, 0, 4, 2, 2, 4, 3, 1, 1, 0, 2, 0] [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 [4, 1, 1, 2, 0, 1, 1, 4, 2, 0, 4, 4, 1, 3, 4, 2, 1, 1, 1, 2, 4, 1, 1, 3, 0] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 [3, 4, 2, 4, 0, 2, 0, 0, 2, 1, 4, 3, 4, 1, 1, 1, 2, 4, 0, 2, 4, 1, 2, 0, 2] [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +query TT +EXPLAIN SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Aggregate: groupBy=[[multiple_ordered_table.a, multiple_ordered_table.b]], aggr=[[FIRST_VALUE(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], LAST_VALUE(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST], ARRAY_AGG(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.d DESC NULLS FIRST]]] +----TableScan: multiple_ordered_table projection=[a, b, c, d] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(multiple_ordered_table.d), LAST_VALUE(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +----SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([a@0, b@1], 8), input_partitions=8 +----------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(multiple_ordered_table.d), LAST_VALUE(multiple_ordered_table.d), ARRAY_AGG(multiple_ordered_table.d)], ordering_mode=Sorted +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true + +statement ok +set datafusion.execution.target_partitions = 1; + +query IIII? rowsort +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 0 [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 3 0 [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 4 0 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 3 2 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +# Test conflicting ordering requirement in single partition +query IIII rowsort +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 0 +0 1 3 0 +1 2 4 0 +1 3 3 0 + +statement ok +set datafusion.execution.target_partitions = 8; + +# Test conflicting ordering requirement in multiple partitions +query IIII? +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), LAST_VALUE(d ORDER BY c DESC), ARRAY_AGG(d ORDER BY d DESC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 0 [4, 4, 4, 4, 3, 3, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0] +0 1 3 0 [4, 4, 4, 3, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0] +1 2 4 0 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0] +1 3 3 2 [4, 4, 4, 4, 4, 4, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + +# Test a query with conflicting ordering requirements (where aggregate groups are not consecutive. +# For query below aggregate groups are at indices [0, 2] and [1]). Please note that there is no group by +query III rowsort +SELECT FIRST_VALUE(d ORDER BY c DESC), FIRST_VALUE(d ORDER BY d DESC, c DESC), FIRST_VALUE(d ORDER BY c ASC) + FROM multiple_ordered_table +---- +3 4 0 + +# Test a query with conflicting ordering requirements (where aggregate groups are not consecutive. +# For query below aggregate groups are at indices [0, 2] and [1]) with group by. . Please note that there is group by +query IIIII rowsort +SELECT a, b, FIRST_VALUE(d ORDER BY c DESC), FIRST_VALUE(d ORDER BY d DESC, c DESC), FIRST_VALUE(d ORDER BY c ASC) + FROM multiple_ordered_table + GROUP BY a, b + ORDER BY a, b +---- +0 0 4 4 0 +0 1 3 4 0 +1 2 4 4 0 +1 3 3 4 2 query ITIPTR rowsort SELECT r.* diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 1ad17fbb8c91..9e6680ded42f 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3454,7 +3454,7 @@ SortPreservingMergeExec: [a@0 ASC] ------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2 -------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0]) +------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] --------------CoalesceBatchesExec: target_batch_size=2 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] ------------------CoalesceBatchesExec: target_batch_size=2 @@ -3462,7 +3462,7 @@ SortPreservingMergeExec: [a@0 ASC] ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true ------------------CoalesceBatchesExec: target_batch_size=2 ---------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC NULLS LAST +--------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true