Skip to content

Commit e39b5ca

Browse files
[bug]: Fix multi partition wrong column requirement bug (#7129)
* bug fix, aggregate multi partition wrong index * Add check for whether requirement expression is already used in group by * Minor changes * minor changes * Minor changes * Minor changes * Minor hcnages * minor changes * Minor changes * Minor changes * Minor changes * Update test * simplifications * Update merge_batch of first and last * add new test * Simplifications * Remove unnecessary code * Minor changes * Minor changes * Simplifications * Minor changes * Simplifications * Add comment * Remove artifact during merge * move is_first_stage to method * Improve comments, use more idiomatic constructs --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent ddb9549 commit e39b5ca

File tree

6 files changed

+526
-164
lines changed

6 files changed

+526
-164
lines changed

datafusion/core/src/physical_plan/aggregates/mod.rs

Lines changed: 229 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::physical_plan::{
2525
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
2626
SendableRecordBatchStream, Statistics,
2727
};
28+
2829
use arrow::array::ArrayRef;
2930
use arrow::datatypes::{Field, Schema, SchemaRef};
3031
use arrow::record_batch::RecordBatch;
@@ -40,6 +41,7 @@ use datafusion_physical_expr::{
4041
AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
4142
PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
4243
};
44+
4345
use std::any::Any;
4446
use std::collections::HashMap;
4547
use std::sync::Arc;
@@ -84,6 +86,20 @@ pub enum AggregateMode {
8486
SinglePartitioned,
8587
}
8688

89+
impl AggregateMode {
90+
/// Checks whether this aggregation step describes a "first stage" calculation.
91+
/// In other words, its input is not another aggregation result and the
92+
/// `merge_batch` method will not be called for these modes.
93+
fn is_first_stage(&self) -> bool {
94+
match self {
95+
AggregateMode::Partial
96+
| AggregateMode::Single
97+
| AggregateMode::SinglePartitioned => true,
98+
AggregateMode::Final | AggregateMode::FinalPartitioned => false,
99+
}
100+
}
101+
}
102+
87103
/// Group By expression modes
88104
///
89105
/// `PartiallyOrdered` and `FullyOrdered` are used to reason about
@@ -95,9 +111,6 @@ pub enum AggregateMode {
95111
/// previous combinations are guaranteed never to appear again
96112
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97113
pub enum GroupByOrderMode {
98-
/// The input is not (known to be) ordered by any of the
99-
/// expressions in the GROUP BY clause.
100-
None,
101114
/// The input is known to be ordered by a preset (prefix but
102115
/// possibly reordered) of the expressions in the `GROUP BY` clause.
103116
///
@@ -475,13 +488,13 @@ fn calc_required_input_ordering(
475488
};
476489
for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
477490
if let Some(AggregationOrdering {
478-
ordering,
479491
// If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
480492
// running with bounded memory, without breaking the pipeline),
481493
// then we append the aggregator ordering requirement to the existing
482494
// ordering. This way, we can still run with bounded memory.
483495
mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
484496
order_indices,
497+
..
485498
}) = aggregation_ordering
486499
{
487500
// Get the section of the input ordering that enables us to run in
@@ -495,32 +508,17 @@ fn calc_required_input_ordering(
495508
let mut requirement =
496509
PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
497510
for req in aggregator_requirement {
498-
if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
499-
requirement.push(req.clone());
500-
}
501-
// In partial mode, append required ordering of the aggregator to the output ordering.
502-
// In case of multiple partitions, this enables us to reduce partitions correctly.
503-
if matches!(mode, AggregateMode::Partial)
504-
&& ordering.iter().all(|item| req.expr.ne(&item.expr))
511+
// Final and FinalPartitioned modes don't enforce ordering
512+
// requirements since order-sensitive aggregators handle such
513+
// requirements during merging.
514+
if mode.is_first_stage()
515+
&& requirement.iter().all(|item| req.expr.ne(&item.expr))
505516
{
506-
ordering.push(req.into());
517+
requirement.push(req);
507518
}
508519
}
509520
required_input_ordering = requirement;
510-
} else {
511-
// If there was no pre-existing output ordering, the output ordering is simply the required
512-
// ordering of the aggregator in partial mode.
513-
if matches!(mode, AggregateMode::Partial)
514-
&& !aggregator_requirement.is_empty()
515-
{
516-
*aggregation_ordering = Some(AggregationOrdering {
517-
mode: GroupByOrderMode::None,
518-
order_indices: vec![],
519-
ordering: PhysicalSortRequirement::to_sort_exprs(
520-
aggregator_requirement.clone(),
521-
),
522-
});
523-
}
521+
} else if mode.is_first_stage() {
524522
required_input_ordering = aggregator_requirement;
525523
}
526524
// Keep track of the direction from which required_input_ordering is constructed:
@@ -596,12 +594,16 @@ impl AggregateExec {
596594
.iter()
597595
.zip(order_by_expr.into_iter())
598596
.map(|(aggr_expr, fn_reqs)| {
599-
// If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
600-
if is_order_sensitive(aggr_expr) {
601-
fn_reqs
602-
} else {
603-
None
604-
}
597+
// If the aggregation function is order-sensitive and we are
598+
// performing a "first stage" calculation, keep the ordering
599+
// requirement as is; otherwise ignore the ordering requirement.
600+
// In non-first stage modes, we accumulate data (using `merge_batch`)
601+
// from different partitions (i.e. merge partial results). During
602+
// this merge, we consider the ordering of each partial result.
603+
// Hence, we do not need to use the ordering requirement in such
604+
// modes as long as partial results are generated with the
605+
// correct ordering.
606+
fn_reqs.filter(|_| is_order_sensitive(aggr_expr) && mode.is_first_stage())
605607
})
606608
.collect::<Vec<_>>();
607609
let mut aggregator_reverse_reqs = None;
@@ -645,7 +647,6 @@ impl AggregateExec {
645647
}
646648

647649
let mut aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
648-
649650
let required_input_ordering = calc_required_input_ordering(
650651
&input,
651652
&mut aggr_expr,
@@ -1216,42 +1217,45 @@ fn evaluate_group_by(
12161217
mod tests {
12171218
use super::*;
12181219
use crate::execution::context::SessionConfig;
1220+
use crate::physical_plan::aggregates::GroupByOrderMode::{
1221+
FullyOrdered, PartiallyOrdered,
1222+
};
12191223
use crate::physical_plan::aggregates::{
12201224
get_finest_requirement, get_working_mode, AggregateExec, AggregateMode,
12211225
PhysicalGroupBy,
12221226
};
1227+
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
1228+
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
12231229
use crate::physical_plan::expressions::{col, Avg};
1230+
use crate::physical_plan::memory::MemoryExec;
1231+
use crate::physical_plan::{
1232+
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
1233+
SendableRecordBatchStream, Statistics,
1234+
};
1235+
use crate::prelude::SessionContext;
12241236
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
12251237
use crate::test::{assert_is_pending, csv_exec_sorted};
1226-
use crate::{assert_batches_sorted_eq, physical_plan::common};
1238+
use crate::{assert_batches_eq, assert_batches_sorted_eq, physical_plan::common};
1239+
12271240
use arrow::array::{Float64Array, UInt32Array};
12281241
use arrow::compute::{concat_batches, SortOptions};
12291242
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
12301243
use arrow::record_batch::RecordBatch;
12311244
use datafusion_common::{DataFusionError, Result, ScalarValue};
12321245
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
12331246
use datafusion_physical_expr::expressions::{
1234-
lit, ApproxDistinct, Column, Count, FirstValue, Median,
1247+
lit, ApproxDistinct, Column, Count, FirstValue, LastValue, Median,
12351248
};
12361249
use datafusion_physical_expr::{
12371250
AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties,
12381251
PhysicalExpr, PhysicalSortExpr,
12391252
};
1240-
use futures::{FutureExt, Stream};
1253+
12411254
use std::any::Any;
12421255
use std::sync::Arc;
12431256
use std::task::{Context, Poll};
12441257

1245-
use super::StreamType;
1246-
use crate::physical_plan::aggregates::GroupByOrderMode::{
1247-
FullyOrdered, PartiallyOrdered,
1248-
};
1249-
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1250-
use crate::physical_plan::{
1251-
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
1252-
SendableRecordBatchStream, Statistics,
1253-
};
1254-
use crate::prelude::SessionContext;
1258+
use futures::{FutureExt, Stream};
12551259

12561260
// Generate a schema which consists of 5 columns (a, b, c, d, e)
12571261
fn create_test_schema() -> Result<SchemaRef> {
@@ -1370,6 +1374,57 @@ mod tests {
13701374
)
13711375
}
13721376

1377+
/// Generates some mock data for aggregate tests.
1378+
fn some_data_v2() -> (Arc<Schema>, Vec<RecordBatch>) {
1379+
// Define a schema:
1380+
let schema = Arc::new(Schema::new(vec![
1381+
Field::new("a", DataType::UInt32, false),
1382+
Field::new("b", DataType::Float64, false),
1383+
]));
1384+
1385+
// Generate data so that first and last value results are at 2nd and
1386+
// 3rd partitions. With this construction, we guarantee we don't receive
1387+
// the expected result by accident, but merging actually works properly;
1388+
// i.e. it doesn't depend on the data insertion order.
1389+
(
1390+
schema.clone(),
1391+
vec![
1392+
RecordBatch::try_new(
1393+
schema.clone(),
1394+
vec![
1395+
Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
1396+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
1397+
],
1398+
)
1399+
.unwrap(),
1400+
RecordBatch::try_new(
1401+
schema.clone(),
1402+
vec![
1403+
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
1404+
Arc::new(Float64Array::from(vec![0.0, 1.0, 2.0, 3.0])),
1405+
],
1406+
)
1407+
.unwrap(),
1408+
RecordBatch::try_new(
1409+
schema.clone(),
1410+
vec![
1411+
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
1412+
Arc::new(Float64Array::from(vec![3.0, 4.0, 5.0, 6.0])),
1413+
],
1414+
)
1415+
.unwrap(),
1416+
RecordBatch::try_new(
1417+
schema,
1418+
vec![
1419+
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
1420+
Arc::new(Float64Array::from(vec![2.0, 3.0, 4.0, 5.0])),
1421+
],
1422+
)
1423+
.unwrap(),
1424+
],
1425+
)
1426+
}
1427+
13731428
async fn check_grouping_sets(input: Arc<dyn ExecutionPlan>) -> Result<()> {
13741429
let input_schema = input.schema();
13751430

@@ -1885,6 +1940,134 @@ mod tests {
18851940
Ok(())
18861941
}
18871942

1943+
#[tokio::test]
1944+
async fn run_first_last_multi_partitions() -> Result<()> {
1945+
for use_coalesce_batches in [false, true] {
1946+
for is_first_acc in [false, true] {
1947+
first_last_multi_partitions(use_coalesce_batches, is_first_acc).await?
1948+
}
1949+
}
1950+
Ok(())
1951+
}
1952+
1953+
// This function either constructs the physical plan below,
1954+
//
1955+
// "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]",
1956+
// " CoalesceBatchesExec: target_batch_size=1024",
1957+
// " CoalescePartitionsExec",
1958+
// " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[FIRST_VALUE(b)], ordering_mode=None",
1959+
// " MemoryExec: partitions=4, partition_sizes=[1, 1, 1, 1]",
1960+
//
1961+
// or
1962+
//
1963+
// "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]",
1964+
// " CoalescePartitionsExec",
1965+
// " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[FIRST_VALUE(b)], ordering_mode=None",
1966+
// " MemoryExec: partitions=4, partition_sizes=[1, 1, 1, 1]",
1967+
//
1968+
// and checks whether the function `merge_batch` works correctly for
1969+
// FIRST_VALUE and LAST_VALUE functions.
1970+
async fn first_last_multi_partitions(
1971+
use_coalesce_batches: bool,
1972+
is_first_acc: bool,
1973+
) -> Result<()> {
1974+
let session_ctx = SessionContext::new();
1975+
let task_ctx = session_ctx.task_ctx();
1976+
1977+
let (schema, data) = some_data_v2();
1978+
let partition1 = data[0].clone();
1979+
let partition2 = data[1].clone();
1980+
let partition3 = data[2].clone();
1981+
let partition4 = data[3].clone();
1982+
1983+
let groups =
1984+
PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
1985+
1986+
let ordering_req = vec![PhysicalSortExpr {
1987+
expr: col("b", &schema)?,
1988+
options: SortOptions::default(),
1989+
}];
1990+
let aggregates: Vec<Arc<dyn AggregateExpr>> = if is_first_acc {
1991+
vec![Arc::new(FirstValue::new(
1992+
col("b", &schema)?,
1993+
"FIRST_VALUE(b)".to_string(),
1994+
DataType::Float64,
1995+
ordering_req.clone(),
1996+
vec![DataType::Float64],
1997+
))]
1998+
} else {
1999+
vec![Arc::new(LastValue::new(
2000+
col("b", &schema)?,
2001+
"LAST_VALUE(b)".to_string(),
2002+
DataType::Float64,
2003+
ordering_req.clone(),
2004+
vec![DataType::Float64],
2005+
))]
2006+
};
2007+
2008+
let memory_exec = Arc::new(MemoryExec::try_new(
2009+
&[
2010+
vec![partition1],
2011+
vec![partition2],
2012+
vec![partition3],
2013+
vec![partition4],
2014+
],
2015+
schema.clone(),
2016+
None,
2017+
)?);
2018+
let aggregate_exec = Arc::new(AggregateExec::try_new(
2019+
AggregateMode::Partial,
2020+
groups.clone(),
2021+
aggregates.clone(),
2022+
vec![None],
2023+
vec![Some(ordering_req.clone())],
2024+
memory_exec,
2025+
schema.clone(),
2026+
)?);
2027+
let coalesce = if use_coalesce_batches {
2028+
let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec));
2029+
Arc::new(CoalesceBatchesExec::new(coalesce, 1024)) as Arc<dyn ExecutionPlan>
2030+
} else {
2031+
Arc::new(CoalescePartitionsExec::new(aggregate_exec))
2032+
as Arc<dyn ExecutionPlan>
2033+
};
2034+
let aggregate_final = Arc::new(AggregateExec::try_new(
2035+
AggregateMode::Final,
2036+
groups,
2037+
aggregates.clone(),
2038+
vec![None],
2039+
vec![Some(ordering_req)],
2040+
coalesce,
2041+
schema,
2042+
)?) as Arc<dyn ExecutionPlan>;
2043+
2044+
let result = crate::physical_plan::collect(aggregate_final, task_ctx).await?;
2045+
if is_first_acc {
2046+
let expected = vec![
2047+
"+---+----------------+",
2048+
"| a | FIRST_VALUE(b) |",
2049+
"+---+----------------+",
2050+
"| 2 | 0.0 |",
2051+
"| 3 | 1.0 |",
2052+
"| 4 | 3.0 |",
2053+
"+---+----------------+",
2054+
];
2055+
assert_batches_eq!(expected, &result);
2056+
} else {
2057+
let expected = vec![
2058+
"+---+---------------+",
2059+
"| a | LAST_VALUE(b) |",
2060+
"+---+---------------+",
2061+
"| 2 | 3.0 |",
2062+
"| 3 | 5.0 |",
2063+
"| 4 | 6.0 |",
2064+
"+---+---------------+",
2065+
];
2066+
assert_batches_eq!(expected, &result);
2067+
};
2068+
Ok(())
2069+
}
2070+
18882071
#[tokio::test]
18892072
async fn test_get_finest_requirements() -> Result<()> {
18902073
let test_schema = create_test_schema()?;

datafusion/core/src/physical_plan/aggregates/order/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ impl GroupOrdering {
5252
} = ordering;
5353

5454
Ok(match mode {
55-
GroupByOrderMode::None => GroupOrdering::None,
5655
GroupByOrderMode::PartiallyOrdered => {
5756
let partial =
5857
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;

0 commit comments

Comments
 (0)