Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use ahash::RandomState;
use futures::ready;
use futures::stream::{Stream, StreamExt};
use hashbrown::raw::RawTable;
use itertools::izip;

use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
Expand All @@ -43,15 +42,14 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;

use crate::physical_plan::aggregates::utils::{
aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
aggr_state_schema, get_at_indices, get_optional_filters, read_as_batch,
slice_and_maybe_filter, ExecutionState, GroupState, GroupStateRowAccumulatorsUpdater,
};
use arrow::array::{new_null_array, ArrayRef, UInt32Builder};
use arrow::array::*;
use arrow::compute::{cast, SortColumn};
use arrow::datatypes::DataType;
use arrow::row::{OwnedRow, RowConverter, SortField};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -378,7 +376,7 @@ impl BoundedAggregateStream {
};
for row in range.start..range.end {
// remember this row
group_state.indices.push_accounted(row as u32, allocated);
group_state.indices.push_accounted(row, allocated);
}
}
// 1.2 Need to create new entry
Expand All @@ -400,8 +398,7 @@ impl BoundedAggregateStream {
self.row_aggr_layout.fixed_part_width()
],
accumulator_set,
indices: (range.start as u32..range.end as u32)
.collect::<Vec<_>>(), // 1.3
indices: (range.start..range.end).collect::<Vec<_>>(), // 1.3
};
let group_idx = row_group_states.len();

Expand Down Expand Up @@ -487,7 +484,7 @@ impl BoundedAggregateStream {
groups_with_rows.push(*group_idx);
};

group_state.indices.push_accounted(row as u32, allocated); // remember this row
group_state.indices.push_accounted(row, allocated); // remember this row
}
// 1.2 Need to create new entry
None => {
Expand All @@ -507,7 +504,7 @@ impl BoundedAggregateStream {
self.row_aggr_layout.fixed_part_width()
],
accumulator_set,
indices: vec![row as u32], // 1.3
indices: vec![row], // 1.3
};
let group_idx = group_states.len();

Expand All @@ -516,7 +513,7 @@ impl BoundedAggregateStream {
*allocated += std::mem::size_of_val(&group_state.group_by_values)
+ (std::mem::size_of::<u8>()
* group_state.aggregation_buffer.capacity())
+ (std::mem::size_of::<u32>() * group_state.indices.capacity());
+ (std::mem::size_of::<usize>() * group_state.indices.capacity());

// Allocation done by normal accumulators
*allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
Expand Down Expand Up @@ -589,6 +586,7 @@ impl BoundedAggregateStream {
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor
.point_to(0, group_state.aggregation_buffer.as_mut_slice());

match self.mode {
AggregateMode::Partial | AggregateMode::Single => {
accumulator.update_batch(&values, &mut state_accessor)
Expand Down Expand Up @@ -634,56 +632,6 @@ impl BoundedAggregateStream {
Ok(())
}

// Update the accumulator results, according to aggr_state.
fn update_accumulators_using_scalar(
&mut self,
groups_with_rows: &[usize],
row_values: &[Vec<ArrayRef>],
row_filter_values: &[Option<ArrayRef>],
) -> Result<()> {
let filter_bool_array = row_filter_values
.iter()
.map(|filter_opt| match filter_opt {
Some(f) => Ok(Some(as_boolean_array(f)?)),
None => Ok(None),
})
.collect::<Result<Vec<_>>>()?;

for group_idx in groups_with_rows {
let group_state =
&mut self.aggr_state.ordered_group_states[*group_idx].group_state;
let mut state_accessor =
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
for idx in &group_state.indices {
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
if values_array.len() == 1 {
let scalar_value =
col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
} else {
let scalar_values = values_array
.iter()
.map(|array| {
col_to_scalar(array, filter_array, *idx as usize)
})
.collect::<Result<Vec<_>>>()?;
accumulator
.update_scalar_values(&scalar_values, &mut state_accessor)?;
}
}
}
// clear the group indices in this group
group_state.indices.clear();
}

Ok(())
}

/// Perform group-by aggregation for the given [`RecordBatch`].
///
/// If successful, this returns the additional number of bytes that were allocated during this process.
Expand Down Expand Up @@ -754,22 +702,27 @@ impl BoundedAggregateStream {
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor
{
self.update_accumulators_using_scalar(
self.update_row_accumulators(
&groups_with_rows,
&row_aggr_input_values,
&row_filter_values,
self.row_aggr_layout.clone(),
)?;
} else {
// Collect all indices + offsets based on keys in this vec
let mut batch_indices = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
let indices = &self.aggr_state.ordered_group_states[group_idx]
let mut indices_u32 = vec![];
for idx in &self.aggr_state.ordered_group_states[group_idx]
.group_state
.indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
.indices
{
indices_u32.push(*idx as u32)
}
batch_indices.append_slice(indices_u32.as_slice());
offset_so_far += indices_u32.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
Expand Down Expand Up @@ -1042,3 +995,34 @@ impl BoundedAggregateStream {
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
}

impl GroupStateRowAccumulatorsUpdater for BoundedAggregateStream {
fn get_row_accumulator(&self, acc_idx: usize) -> *const RowAccumulatorItem {
&self.row_accumulators[acc_idx]
}

fn get_row_accumulators(&self) -> &[RowAccumulatorItem] {
self.row_accumulators.as_slice()
}

#[inline(always)]
fn get_group_state(&self, group_idx: &usize) -> &GroupState {
&self.aggr_state.ordered_group_states[*group_idx].group_state
}

#[inline(always)]
fn get_mut_group_state(&mut self, group_idx: &usize) -> &mut GroupState {
&mut self.aggr_state.ordered_group_states[*group_idx].group_state
}

#[inline(always)]
fn get_mut_group_state_and_row_accumulators(
&mut self,
group_idx: &usize,
) -> (&mut GroupState, &[RowAccumulatorItem]) {
(
&mut self.aggr_state.ordered_group_states[*group_idx].group_state,
self.row_accumulators.as_slice(),
)
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::aggregate::row_agg_macros;
use datafusion_physical_expr::{
aggregate::row_accumulator::RowAccumulator,
equivalence::project_equivalence_properties,
expressions::{Avg, CastExpr, Column, Sum},
normalize_out_expr_with_columns_map, reverse_order_bys,
Expand All @@ -53,6 +53,7 @@ mod utils;

pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::is_order_sensitive;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulatorItem;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_physical_expr::utils::{
get_finer_ordering, ordering_satisfy_requirement_concrete,
Expand Down Expand Up @@ -1076,7 +1077,6 @@ fn merge_expressions(
}

pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
pub(crate) type RowAccumulatorItem = Box<dyn RowAccumulator>;

fn create_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
Expand Down
Loading