Skip to content

Commit 92d2485

Browse files
committed
Improve aggregatation documentation
1 parent 9b7e4c8 commit 92d2485

File tree

7 files changed

+201
-18
lines changed

7 files changed

+201
-18
lines changed

datafusion/expr/src/accumulator.rs

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
9494
///
9595
/// Intermediate state is used for "multi-phase" grouping in
9696
/// DataFusion, where an aggregate is computed in parallel with
97-
/// multiple `Accumulator` instances, as illustrated below:
97+
/// multiple `Accumulator` instances, as described below:
9898
///
9999
/// # MultiPhase Grouping
100100
///
@@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
130130
/// `───────' `───────'
131131
/// ```
132132
///
133-
/// The partial state is serialied as `Arrays` and then combined
133+
/// The partial state is serialized as `Arrays` and then combined
134134
/// with other partial states from different instances of this
135135
/// Accumulator (that ran on different partitions, for example).
136136
///
@@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
147147
/// Note that [`ScalarValue::List`] can be used to pass multiple
148148
/// values if the number of intermediate values is not known at
149149
/// planning time (e.g. for `MEDIAN`)
150+
///
151+
/// # Multi-phase repartitioned Grouping
152+
///
153+
/// Many multi-phase grouping plans contain a Repartition operation
154+
/// as well as shown below:
155+
///
156+
/// ```text
157+
/// ▲ ▲
158+
/// │ │
159+
/// │ │
160+
/// │ │
161+
/// │ │
162+
/// │ │
163+
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
164+
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
165+
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
166+
/// │ │ │ │ that means half the entries)
167+
/// └───────────────────────┘ └───────────────────────┘
168+
/// ▲ ▲
169+
/// │ │
170+
/// └─────────────┬────────────┘
171+
/// │
172+
/// │
173+
/// │
174+
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
175+
/// │ Repartition │ keys) ensures that each distinct
176+
/// │ HASH(x) │ group key now appears in exactly
177+
/// └─────────────────────────┘ one partition
178+
/// ▲
179+
/// │
180+
/// ┌───────────────┴─────────────┐
181+
/// │ │
182+
/// │ │
183+
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
184+
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
185+
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
186+
/// └─────────────────────────┘ └──────────────────────────┘
187+
/// ▲ ▲
188+
/// │ ┌┘
189+
/// │ │
190+
/// .─────────. .─────────.
191+
/// ,─' '─. ,─' '─.
192+
/// ; Input : ; Input : 1. Since input data is
193+
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
194+
/// ╲ ╱ ╲ ╱ distributed, each partition
195+
/// '─. ,─' '─. ,─' likely has all distinct
196+
/// `───────' `───────'
197+
/// ```
198+
///
199+
/// This structure is used so that the `AggregateMode::Partial` accumulators
200+
/// reduces the cardinality of the input as soon as possible. Typically,
201+
/// each partial accumulator sees all groups in the input as the group keys
202+
/// are evenly distributed across the input.
203+
///
204+
/// The final output is computed by repartitioning the result of
205+
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
206+
/// that each distinct group key appears in exactly one of the
207+
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
208+
/// then unioned together to produce the overall final output.
209+
///
210+
/// Here is an example that shows the distribution of groups in the
211+
/// different phases
212+
///
213+
/// ```text
214+
/// ┌─────┐ ┌─────┐
215+
/// │ 1 │ │ 3 │
216+
/// ├─────┤ ├─────┤
217+
/// │ 2 │ │ 4 │ After repartitioning by
218+
/// └─────┘ └─────┘ hash(group keys), each distinct
219+
/// ┌─────┐ ┌─────┐ group key now appears in exactly
220+
/// │ 1 │ │ 3 │ one partition
221+
/// ├─────┤ ├─────┤
222+
/// │ 2 │ │ 4 │
223+
/// └─────┘ └─────┘
224+
///
225+
///
226+
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
227+
///
228+
/// ┌─────┐ ┌─────┐
229+
/// │ 2 │ │ 2 │
230+
/// ├─────┤ ├─────┤
231+
/// │ 1 │ │ 2 │
232+
/// ├─────┤ ├─────┤
233+
/// │ 3 │ │ 3 │
234+
/// ├─────┤ ├─────┤
235+
/// │ 4 │ │ 1 │
236+
/// └─────┘ └─────┘ Input data is arbitrarily or
237+
/// ... ... RoundRobin distributed, each
238+
/// ┌─────┐ ┌─────┐ partition likely has all
239+
/// │ 1 │ │ 4 │ distinct group keys
240+
/// ├─────┤ ├─────┤
241+
/// │ 4 │ │ 3 │
242+
/// ├─────┤ ├─────┤
243+
/// │ 1 │ │ 1 │
244+
/// ├─────┤ ├─────┤
245+
/// │ 4 │ │ 3 │
246+
/// └─────┘ └─────┘
247+
///
248+
/// group values group values
249+
/// in partition 0 in partition 1
250+
/// ```
150251
fn state(&mut self) -> Result<Vec<ScalarValue>>;
151252

152253
/// Updates the accumulator's state from an `Array` containing one

datafusion/expr/src/groups_accumulator.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
128128
/// Returns the intermediate aggregate state for this accumulator,
129129
/// used for multi-phase grouping, resetting its internal state.
130130
///
131+
/// See [`Accumulator::state`] for more information on multi-phase
132+
/// aggregation.
133+
///
131134
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
132135
/// but the `MIN` aggregate would just return a single array.
133136
///
134137
/// Note more sophisticated internal state can be passed as
135138
/// single `StructArray` rather than multiple arrays.
136139
///
137140
/// See [`Self::evaluate`] for details on the required output
138-
/// order and `emit_to`.
141+
/// order and `emit_to`.
142+
///
143+
/// [`Accumulator::state`]: crate::Accumulator::state
139144
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
140145

141146
/// Merges intermediate state (the output from [`Self::state`])
142-
/// into this accumulator's values.
147+
/// into this accumulator's current state.
143148
///
144149
/// For some aggregates (such as `SUM`), `merge_batch` is the same
145150
/// as `update_batch`, but for some aggregates (such as `COUNT`,
@@ -158,9 +163,41 @@ pub trait GroupsAccumulator: Send {
158163
total_num_groups: usize,
159164
) -> Result<()>;
160165

161-
/// Converts input batch to intermediate aggregate state,
162-
/// without grouping (each input row considered as a separate
163-
/// group).
166+
/// Converts an input batch directly the intermediate aggregate state.
167+
///
168+
/// This is the equivalent of treating each input row as its own group. It
169+
/// is invoked when the Partial phase of a multi-phase aggregation is not
170+
/// reducing the cardinality enough to warrant spending more effort on
171+
/// pre-aggregation (see `Background` section below), and switches to
172+
/// passing intermediate state directly on to the next aggregation phase.
173+
///
174+
/// Examples:
175+
/// * `COUNT`: an array of 1s for each row in the input batch.
176+
/// * `SUM/MIN/MAX`: the input values themselves.
177+
///
178+
/// # Arguments
179+
/// * `values`: the input arguments to the accumulator
180+
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
181+
///
182+
/// # Background
183+
///
184+
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
185+
/// Partial phase reduces the cardinality of the input data as soon as
186+
/// possible in the plan.
187+
///
188+
/// This strategy is very effective for queries with a small number of
189+
/// groups, as most of the data is aggregated immediately and only a small
190+
/// amount of data must be repartitioned (see [`Accumulator::state`] for
191+
/// background)
192+
///
193+
/// However, for queries with a large number of groups, the Partial phase
194+
/// often does not reduce the cardinality enough to warrant the memory and
195+
/// CPU cost of actually performing the aggregation. For such cases, the
196+
/// HashAggregate operator will dynamically switch to passing intermediate
197+
/// state directly to the next aggregation phase with minimal processing
198+
/// using this method.
199+
///
200+
/// [`Accumulator::state`]: crate::Accumulator::state
164201
fn convert_to_state(
165202
&self,
166203
_values: &[ArrayRef],
@@ -169,15 +206,16 @@ pub trait GroupsAccumulator: Send {
169206
not_impl_err!("Input batch conversion to state not implemented")
170207
}
171208

172-
/// Returns `true` is groups accumulator supports input batch
173-
/// to intermediate aggregate state conversion (`convert_to_state`
174-
/// method is implemented).
209+
/// Returns `true` if [`Self::convert_to_state`] is implemented to support
210+
/// intermediate aggregate state conversion.
175211
fn convert_to_state_supported(&self) -> bool {
176212
false
177213
}
178214

179215
/// Amount of memory used to store the state of this accumulator,
180-
/// in bytes. This function is called once per batch, so it should
181-
/// be `O(n)` to compute, not `O(num_groups)`
216+
/// in bytes.
217+
///
218+
/// This function is called once per batch, so it should be `O(n)` to
219+
/// compute, not `O(num_groups)`
182220
fn size(&self) -> usize;
183221
}

datafusion/expr/src/udaf.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
343343

344344
/// Return the fields used to store the intermediate state of this accumulator.
345345
///
346+
/// See [`Accumulator::state`] for background information.
347+
///
346348
/// # Arguments:
347349
/// 1. `name`: the name of the expression (e.g. AVG, SUM, etc)
348350
/// 2. `value_type`: Aggregate function output returned by [`Self::return_type`] if defined, otherwise
349351
/// it is equivalent to the data type of the first arguments
350352
/// 3. `ordering_fields`: the fields used to order the input arguments, if any.
351353
/// Empty if no ordering expression is provided.
352354
///
355+
///
353356
/// # Notes:
354357
///
355358
/// The default implementation returns a single state field named `name`
@@ -384,7 +387,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
384387
/// # Notes
385388
///
386389
/// Even if this function returns true, DataFusion will still use
387-
/// `Self::accumulator` for certain queries, such as when this aggregate is
390+
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
388391
/// used as a window function or when there no GROUP BY columns in the
389392
/// query.
390393
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {

datafusion/functions-aggregate/src/count.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,11 @@ impl GroupsAccumulator for CountGroupsAccumulator {
433433
Ok(vec![Arc::new(counts) as ArrayRef])
434434
}
435435

436+
/// Converts an input batch directly to a state batch
437+
///
438+
/// The state of `COUNT` is always a single Int64Array:
439+
/// * `1` (for non-null, non filtered values)
440+
/// * `0` (for null values)
436441
fn convert_to_state(
437442
&self,
438443
values: &[ArrayRef],

datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ where
134134
self.update_batch(values, group_indices, opt_filter, total_num_groups)
135135
}
136136

137+
/// Converts an input batch directly to a state batch
138+
///
139+
/// The state is:
140+
/// - self.prim_fn for all non null, non filtered values
141+
/// - null otherwise
142+
///
137143
fn convert_to_state(
138144
&self,
139145
values: &[ArrayRef],

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,20 @@ pub use datafusion_expr::AggregateFunction;
5959
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
6060

6161
/// Hash aggregate modes
62+
///
63+
/// See [`Accumulator::state`] for background information on multi-phase
64+
/// aggregation and how these modes are used.
6265
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6366
pub enum AggregateMode {
64-
/// Partial aggregate that can be applied in parallel across input partitions
67+
/// Partial aggregate that can be applied in parallel across input
68+
/// partitions.
69+
///
70+
/// This is the first phase of a multi-phase aggregation.
6571
Partial,
66-
/// Final aggregate that produces a single partition of output
72+
/// Final aggregate that produces a single partition of output by combining
73+
/// the output of multiple partial aggregates.
74+
///
75+
/// This is the second phase of a multi-phase aggregation.
6776
Final,
6877
/// Final aggregate that works on pre-partitioned data.
6978
///
@@ -75,12 +84,15 @@ pub enum AggregateMode {
7584
/// Applies the entire logical aggregation operation in a single operator,
7685
/// as opposed to Partial / Final modes which apply the logical aggregation using
7786
/// two operators.
87+
///
7888
/// This mode requires that the input is a single partition (like Final)
7989
Single,
8090
/// Applies the entire logical aggregation operation in a single operator,
8191
/// as opposed to Partial / Final modes which apply the logical aggregation using
8292
/// two operators.
83-
/// This mode requires that the input is partitioned by group key (like FinalPartitioned)
93+
///
94+
/// This mode requires that the input is partitioned by group key (like
95+
/// FinalPartitioned)
8496
SinglePartitioned,
8597
}
8698

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl SkipAggregationProbe {
204204
/// of `x` and one accumulator for `SUM(y)`, specialized for the data
205205
/// type of `y`.
206206
///
207-
/// # Description
207+
/// # Discussion
208208
///
209209
/// [`group_values`] does not store any aggregate state inline. It only
210210
/// assigns "group indices", one for each (distinct) group value. The
@@ -222,7 +222,25 @@ impl SkipAggregationProbe {
222222
///
223223
/// [`group_values`]: Self::group_values
224224
///
225-
/// # Spilling
225+
/// # Partial Aggregate and multi-phase grouping
226+
///
227+
/// As described on [`Accumulator::state`], this operator is used in the context
228+
/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`].
229+
///
230+
/// An important optimization for multi-phase partial aggregation is to skip
231+
/// partial aggregation when it is not effective enough to warrant the memory or
232+
/// CPU cost, as is often the case for queries many distinct groups (high
233+
/// cardinality group by). Memory is particularly important because each Partial
234+
/// aggregator must store the intermediate state for each group.
235+
///
236+
/// If the ratio of the number of groups to the number of input rows exceeds a
237+
/// threshold, and [`GroupsAccumulator::convert_to_state_supported`] is
238+
/// supported, this operator will stop applying Partial aggregation and directly
239+
/// pass the input rows to the next aggregation phase.
240+
///
241+
/// [`Accumulator::state`]: datafusion_expr::Accumulator::state
242+
///
243+
/// # Spilling (to disk)
226244
///
227245
/// The sizes of group values and accumulators can become large. Before that causes out of memory,
228246
/// this hash aggregator outputs partial states early for partial aggregation or spills to local

0 commit comments

Comments
 (0)