Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 37 additions & 44 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,23 @@ impl PhysicalGroupBy {
pub fn is_single(&self) -> bool {
self.null_expr.is_empty()
}

/// Calculate GROUP BY expressions according to input schema.
pub fn input_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.expr
.iter()
.map(|(expr, _alias)| expr.clone())
.collect()
}

/// Return grouping expressions as they occur in the output schema.
fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.expr
.iter()
.enumerate()
.map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
.collect()
}
}

impl PartialEq for PhysicalGroupBy {
Expand Down Expand Up @@ -319,11 +336,7 @@ fn get_working_mode(
// Since direction of the ordering is not important for GROUP BY columns,
// we convert PhysicalSortExpr to PhysicalExpr in the existing ordering.
let ordering_exprs = convert_to_expr(output_ordering);
let groupby_exprs = group_by
.expr
.iter()
.map(|(item, _)| item.clone())
.collect::<Vec<_>>();
let groupby_exprs = group_by.input_exprs();
// Find where each expression of the GROUP BY clause occurs in the existing
// ordering (if it occurs):
let mut ordered_indices =
Expand Down Expand Up @@ -363,7 +376,7 @@ fn calc_aggregation_ordering(
) -> Option<AggregationOrdering> {
get_working_mode(input, group_by).map(|(mode, order_indices)| {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
let out_group_expr = output_group_expr_helper(group_by);
let out_group_expr = group_by.output_exprs();
// Calculate output ordering information for the operator:
let out_ordering = order_indices
.iter()
Expand All @@ -381,18 +394,6 @@ fn calc_aggregation_ordering(
})
}

/// This function returns grouping expressions as they occur in the output schema.
fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalExpr>> {
// Update column indices. Since the group by columns come first in the output schema, their
// indices are simply 0..self.group_expr(len).
group_by
.expr()
.iter()
.enumerate()
.map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
.collect()
}

/// This function returns the ordering requirement of the first non-reversible
/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves
/// as the initial requirement while calculating the finest requirement among all
Expand Down Expand Up @@ -591,11 +592,7 @@ fn group_by_contains_all_requirements(
group_by: &PhysicalGroupBy,
requirement: &LexOrdering,
) -> bool {
let physical_exprs = group_by
.expr()
.iter()
.map(|(expr, _alias)| expr.clone())
.collect::<Vec<_>>();
let physical_exprs = group_by.input_exprs();
// When we have multiple groups (grouping set)
// since group by may be calculated on the subset of the group_by.expr()
// it is not guaranteed to have all of the requirements among group by expressions.
Expand Down Expand Up @@ -735,7 +732,7 @@ impl AggregateExec {

/// Grouping expressions as they occur in the output schema
pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
output_group_expr_helper(&self.group_by)
self.group_by.output_exprs()
}

/// Aggregate expressions
Expand Down Expand Up @@ -894,28 +891,24 @@ impl ExecutionPlan for AggregateExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
match &self.mode {
AggregateMode::Partial | AggregateMode::Single => {
// Partial and Single Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
match input_partition {
Partitioning::Hash(exprs, part) => {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
normalize_out_expr_with_columns_map(
expr,
&self.columns_map,
)
})
.collect::<Vec<_>>();
Partitioning::Hash(normalized_exprs, part)
}
_ => input_partition,
}
let input_partition = self.input.output_partitioning();
if self.mode.is_first_stage() {
// First stage Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
normalize_out_expr_with_columns_map(expr, &self.columns_map)
})
.collect::<Vec<_>>();
Partitioning::Hash(normalized_exprs, part)
} else {
input_partition
}
} else {
// Final Aggregation's output partitioning is the same as its real input
_ => self.input.output_partitioning(),
input_partition
}
}

Expand Down
20 changes: 8 additions & 12 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,14 @@ impl ExecutionPlan for ProjectionExec {
fn output_partitioning(&self) -> Partitioning {
// Output partition need to respect the alias
let input_partition = self.input.output_partitioning();
match input_partition {
Partitioning::Hash(exprs, part) => {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
normalize_out_expr_with_columns_map(expr, &self.columns_map)
})
.collect::<Vec<_>>();

Partitioning::Hash(normalized_exprs, part)
}
_ => input_partition,
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| normalize_out_expr_with_columns_map(expr, &self.columns_map))
.collect::<Vec<_>>();
Partitioning::Hash(normalized_exprs, part)
} else {
input_partition
}
}

Expand Down