From 2507e20910745a35622d10ed320d07cf69f68427 Mon Sep 17 00:00:00 2001 From: hhj Date: Fri, 10 Nov 2023 19:49:43 +0800 Subject: [PATCH 1/9] init impl --- .../src/single_distinct_to_groupby.rs | 121 ++++++++++++++---- 1 file changed, 94 insertions(+), 27 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 414217612d1e..b38e5ce6f0b9 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -23,6 +23,10 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; +use datafusion_expr::aggregate_function::{ + self, + AggregateFunction::{Count, Max, Min, Sum}, +}; use datafusion_expr::{ col, expr::AggregateFunction, @@ -34,17 +38,19 @@ use hashbrown::HashSet; /// single distinct to group by optimizer rule /// ```text -/// SELECT F1(DISTINCT s),F2(DISTINCT s) -/// ... -/// GROUP BY k -/// -/// Into +/// Before: +/// SELECT a, COUNT(DINSTINCT b), COUNT(c) +/// FROM t +/// GROUP BY a /// -/// SELECT F1(alias1),F2(alias1) +/// After: +/// SELECT a, COUNT(alias1), SUM(alias2) /// FROM ( -/// SELECT s as alias1, k ... GROUP BY s, k +/// SELECT a, b as alias1, COUNT(c) as alias2 +/// FROM t +/// GROUP BY a, b /// ) -/// GROUP BY k +/// GROUP BY a /// ``` #[derive(Default)] pub struct SingleDistinctToGroupBy {} @@ -58,27 +64,31 @@ impl SingleDistinctToGroupBy { } } -/// Check whether all aggregate exprs are distinct on a single field. +/// Check whether all distinct aggregate exprs are distinct on a single field. fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { match plan { LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { let mut fields_set = HashSet::new(); - let mut distinct_count = 0; + let mut aggregate_count = 0; for expr in aggr_expr { if let Expr::AggregateFunction(AggregateFunction { - distinct, args, .. + fun, + distinct, + args, + .. }) = expr { + aggregate_count += 1; if *distinct { - distinct_count += 1; - } - for e in args { - fields_set.insert(e.canonical_name()); + for e in args { + fields_set.insert(e.canonical_name()); + } + } else if !matches!(fun, Count | Sum | Min | Max) { + return Ok(false); } } } - let res = distinct_count == aggr_expr.len() && fields_set.len() == 1; - Ok(res) + Ok(fields_set.len() == 1 && aggregate_count == aggr_expr.len()) } _ => Ok(false), } @@ -89,6 +99,24 @@ fn contains_grouping_set(expr: &[Expr]) -> bool { matches!(expr.first(), Some(Expr::GroupingSet(_))) } +/// Rewrite the aggregate function to two aggregate functions. +fn aggregate_function_rewrite( + fun: &aggregate_function::AggregateFunction, +) -> ( + aggregate_function::AggregateFunction, + aggregate_function::AggregateFunction, +) { + match fun { + Count => (Sum, Count), + Sum => (Sum, Sum), + Min => (Min, Min), + Max => (Max, Max), + _ => panic!( + "Not supported aggregate function in single distinct optimization rule" + ), + } +} + impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, @@ -151,7 +179,9 @@ impl OptimizerRule for SingleDistinctToGroupBy { .collect::>(); // replace the distinct arg with alias + let mut index = 1; let mut group_fields_set = HashSet::new(); + let mut inner_aggr_exprs = vec![]; let outer_aggr_exprs = aggr_expr .iter() .map(|aggr_expr| match aggr_expr { @@ -160,32 +190,69 @@ impl OptimizerRule for SingleDistinctToGroupBy { args, filter, order_by, + distinct, .. }) => { // is_single_distinct_agg ensure args.len=1 - if group_fields_set.insert(args[0].display_name()?) { + if *distinct + && group_fields_set.insert(args[0].display_name()?) + { inner_group_exprs.push( args[0].clone().alias(SINGLE_DISTINCT_ALIAS), ); } - Ok(Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - vec![col(SINGLE_DISTINCT_ALIAS)], - false, // intentional to remove distinct here - filter.clone(), - order_by.clone(), - )) - .alias(aggr_expr.display_name()?)) + + let mut expr = + Expr::AggregateFunction(AggregateFunction::new( + fun.clone(), + vec![col(SINGLE_DISTINCT_ALIAS)], + false, // intentional to remove distinct here + filter.clone(), + order_by.clone(), + )) + .alias(aggr_expr.display_name()?); + + // if the aggregate function is not distinct, we need to rewrite it like two phase aggregation + if !(*distinct) { + index += 1; + let alias_str = format!("alias{}", index); + + let (out_fun, inner_fun) = + aggregate_function_rewrite(fun); + expr = + Expr::AggregateFunction(AggregateFunction::new( + out_fun, + vec![col(&alias_str)], + false, + filter.clone(), + order_by.clone(), + )); + let inner_expr = + Expr::AggregateFunction(AggregateFunction::new( + inner_fun, + args.clone(), + false, + filter.clone(), + order_by.clone(), + )) + .alias(&alias_str); + inner_aggr_exprs.push(inner_expr); + } + + Ok(expr) } _ => Ok(aggr_expr.clone()), }) .collect::>>()?; + + println!("inner_agg: {:?}", inner_aggr_exprs); + println!("outer_agg: {:?}", outer_aggr_exprs); // construct the inner AggrPlan let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( input.clone(), inner_group_exprs, - Vec::new(), + inner_aggr_exprs, )?); Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new( From d13db079a8d32f4676f3214b21d60159b87d6564 Mon Sep 17 00:00:00 2001 From: hhj Date: Fri, 10 Nov 2023 20:25:37 +0800 Subject: [PATCH 2/9] add some test --- datafusion/core/output1.parquet | Bin 0 -> 846 bytes datafusion/core/output2.parquet.snappy | Bin 0 -> 846 bytes datafusion/core/output3.parquet.snappy.parquet | Bin 0 -> 846 bytes .../optimizer/src/single_distinct_to_groupby.rs | 12 +++++------- .../tests/cases/roundtrip_logical_plan.rs | 11 ++++++++++- 5 files changed, 15 insertions(+), 8 deletions(-) create mode 100644 datafusion/core/output1.parquet create mode 100644 datafusion/core/output2.parquet.snappy create mode 100644 datafusion/core/output3.parquet.snappy.parquet diff --git a/datafusion/core/output1.parquet b/datafusion/core/output1.parquet new file mode 100644 index 0000000000000000000000000000000000000000..2e2eed2b00d7b89b30e16dcde821f6a554fba478 GIT binary patch literal 846 zcmaJ=F>BjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqYBjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqYBjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqY>>()?; - println!("inner_agg: {:?}", inner_aggr_exprs); - println!("outer_agg: {:?}", outer_aggr_exprs); - // construct the inner AggrPlan let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( input.clone(), @@ -474,9 +471,10 @@ mod tests { )? .build()?; - // Do nothing - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(DISTINCT test.b), COUNT(test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, COUNT(test.c):Int64;N]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + // Should work + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b), SUM(alias2) AS COUNT(test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, COUNT(test.c):Int64;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:Int64;N]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index ca2b4d48c460..ce4ce0c869ce 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -208,7 +208,7 @@ async fn simple_aggregate() -> Result<()> { #[tokio::test] async fn aggregate_distinct_with_having() -> Result<()> { - roundtrip("SELECT a, count(distinct b) FROM data GROUP BY a, c HAVING count(b) > 100") + roundtrip("SELECT a, count(distinct b), sum(distinct e) FROM data GROUP BY a, c HAVING count(b) > 100") .await } @@ -267,6 +267,15 @@ async fn select_distinct_two_fields() -> Result<()> { .await } +#[tokio::test] +async fn simple_distinct_aggregate() -> Result<()> { + test_alias( + "SELECT a, count(distinct b) FROM data group by a", + "SELECT a, count(b) FROM (SELECT a, b FROM data group by a, b) group by a", + ) + .await +} + #[tokio::test] async fn simple_alias() -> Result<()> { test_alias("SELECT d1.a, d1.b FROM data d1", "SELECT a, b FROM data").await From ae760a92591b9fb68deb656a99d0f2205c692700 Mon Sep 17 00:00:00 2001 From: hhj Date: Fri, 10 Nov 2023 20:26:25 +0800 Subject: [PATCH 3/9] remove .parquet --- datafusion/core/output1.parquet | Bin 846 -> 0 bytes datafusion/core/output2.parquet.snappy | Bin 846 -> 0 bytes datafusion/core/output3.parquet.snappy.parquet | Bin 846 -> 0 bytes 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 datafusion/core/output1.parquet delete mode 100644 datafusion/core/output2.parquet.snappy delete mode 100644 datafusion/core/output3.parquet.snappy.parquet diff --git a/datafusion/core/output1.parquet b/datafusion/core/output1.parquet deleted file mode 100644 index 2e2eed2b00d7b89b30e16dcde821f6a554fba478..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 846 zcmaJ=F>BjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqYBjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqYBjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqY Date: Fri, 10 Nov 2023 22:00:31 +0800 Subject: [PATCH 4/9] add some tests --- .../src/single_distinct_to_groupby.rs | 121 +++++++++++++----- .../sqllogictest/test_files/groupby.slt | 51 ++++++++ .../tests/cases/roundtrip_logical_plan.rs | 22 +++- 3 files changed, 157 insertions(+), 37 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 711af11f844b..1c521fd20167 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -23,11 +23,11 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; -use datafusion_expr::aggregate_function::{ - self, - AggregateFunction::{Count, Max, Min, Sum}, -}; use datafusion_expr::{ + aggregate_function::{ + self, + AggregateFunction::{Count, Max, Min, Sum}, + }, col, expr::AggregateFunction, logical_plan::{Aggregate, LogicalPlan}, @@ -75,16 +75,22 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { fun, distinct, args, + filter, .. }) = expr { - aggregate_count += 1; - if *distinct { - for e in args { - fields_set.insert(e.canonical_name()); + match filter { + Some(_) => return Ok(false), + None => { + aggregate_count += 1; + if *distinct { + for e in args { + fields_set.insert(e.canonical_name()); + } + } else if !matches!(fun, Count | Sum | Min | Max) { + return Ok(false); + } } - } else if !matches!(fun, Count | Sum | Min | Max) { - return Ok(false); } } } @@ -112,7 +118,7 @@ fn aggregate_function_rewrite( Min => (Min, Min), Max => (Max, Max), _ => panic!( - "Not supported aggregate function in single distinct optimization rule" + "Not supported aggregate function in single distinct to group by optimization rule" ), } } @@ -188,7 +194,6 @@ impl OptimizerRule for SingleDistinctToGroupBy { Expr::AggregateFunction(AggregateFunction { fun, args, - filter, order_by, distinct, .. @@ -202,49 +207,45 @@ impl OptimizerRule for SingleDistinctToGroupBy { ); } - let mut expr = - Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - vec![col(SINGLE_DISTINCT_ALIAS)], - false, // intentional to remove distinct here - filter.clone(), - order_by.clone(), - )) - .alias(aggr_expr.display_name()?); - // if the aggregate function is not distinct, we need to rewrite it like two phase aggregation if !(*distinct) { index += 1; let alias_str = format!("alias{}", index); - let (out_fun, inner_fun) = aggregate_function_rewrite(fun); - expr = - Expr::AggregateFunction(AggregateFunction::new( - out_fun, - vec![col(&alias_str)], - false, - filter.clone(), - order_by.clone(), - )).alias(aggr_expr.display_name()?); let inner_expr = Expr::AggregateFunction(AggregateFunction::new( inner_fun, args.clone(), false, - filter.clone(), + None, order_by.clone(), )) .alias(&alias_str); inner_aggr_exprs.push(inner_expr); + Ok(Expr::AggregateFunction(AggregateFunction::new( + out_fun, + vec![col(&alias_str)], + false, + None, + order_by.clone(), + )) + .alias(aggr_expr.display_name()?)) + } else { + Ok(Expr::AggregateFunction(AggregateFunction::new( + fun.clone(), + vec![col(SINGLE_DISTINCT_ALIAS)], + false, // intentional to remove distinct here + None, + order_by.clone(), + )) + .alias(aggr_expr.display_name()?)) } - - Ok(expr) } _ => Ok(aggr_expr.clone()), }) .collect::>>()?; - + // construct the inner AggrPlan let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( input.clone(), @@ -460,6 +461,56 @@ mod tests { assert_optimized_plan_equal(&plan, expected) } + #[test] + fn two_distinct_and_one_common() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate( + vec![col("a")], + vec![ + count(col("c")), + count_distinct(col("b")), + Expr::AggregateFunction(expr::AggregateFunction::new( + AggregateFunction::Max, + vec![col("b")], + true, + None, + None, + )), + ], + )? + .build()?; + // Should work + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS COUNT(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, COUNT(test.c):Int64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:Int64;N]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn one_distinctand_two() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate( + vec![col("a")], + vec![ + count(col("c")), + max(col("c")), + count_distinct(col("b")), + ], + )? + .build()?; + // Should work + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS COUNT(test.c), MAX(alias3) AS MAX(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(test.c):Int64;N, MAX(test.c):UInt32;N, COUNT(DISTINCT test.b):Int64;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2, MAX(test.c) AS alias3]] [a:UInt32, alias1:UInt32, alias2:Int64;N, alias3:UInt32;N]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + #[test] fn distinct_and_common() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 105f11f21628..9d6564bda359 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3841,3 +3841,54 @@ ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t ------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[] --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] ----------------------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv' + +query TIIII +SELECT c1, count(distinct c2), sum(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +---- +a 5 15 21 32064 +b 5 15 19 25286 +c 5 15 21 29106 +d 5 15 18 31106 +e 5 15 21 32514 + +query TT +EXPLAIN SELECT c1, count(distinct c2), count(c3) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +---- +logical_plan +Sort: aggregate_test_100.c1 ASC NULLS LAST +--Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT aggregate_test_100.c2), SUM(alias2) AS COUNT(aggregate_test_100.c3)]] +----Aggregate: groupBy=[[aggregate_test_100.c1, aggregate_test_100.c2 AS alias1]], aggr=[[COUNT(aggregate_test_100.c3) AS alias2]] +------TableScan: aggregate_test_100 projection=[c1, c2, c3] +physical_plan +SortPreservingMergeExec: [c1@0 ASC NULLS LAST] +--SortExec: expr=[c1@0 ASC NULLS LAST] +----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3)] +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 +----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3)] +------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[alias2] +--------------CoalesceBatchesExec: target_batch_size=2 +----------------RepartitionExec: partitioning=Hash([c1@0, alias1@1], 8), input_partitions=8 +------------------AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as alias1], aggr=[alias2] +--------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index ce4ce0c869ce..8c5bb2eace43 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -270,8 +270,26 @@ async fn select_distinct_two_fields() -> Result<()> { #[tokio::test] async fn simple_distinct_aggregate() -> Result<()> { test_alias( - "SELECT a, count(distinct b) FROM data group by a", - "SELECT a, count(b) FROM (SELECT a, b FROM data group by a, b) group by a", + "SELECT a, COUNT(DISTINCT b) FROM data GROUP BY a", + "SELECT a, COUNT(b) FROM (SELECT a, b FROM data GROUP BY a, b) GROUP BY a", + ) + .await +} + +#[tokio::test] +async fn select_distinct_aggregate_two_fields() -> Result<()> { + test_alias( + "SELECT a, COUNT(DISTINCT b), MAX(DISTINCT b) FROM data GROUP BY a", + "SELECT a, COUNT(b), MAX(b) FROM (SELECT a, b FROM data GROUP BY a, b) GROUP BY a", + ) + .await +} + +#[tokio::test] +async fn select_distinct_aggregate_and_no_distinct_aggregate() -> Result<()> { + test_alias( + "SELECT a, COUNT(DISTINCT b), COUNT(c) FROM data GROUP by a", + "SELECT a, COUNT(b), SUM(\"COUNT(data.c)\") FROM (SELECT a, b, COUNT(c) FROM data GROUP BY a, b) GROUP BY a", ) .await } From 26ecd58ccfb701d3c7c9fdf6d78040f5e0225127 Mon Sep 17 00:00:00 2001 From: hhj Date: Fri, 10 Nov 2023 22:04:00 +0800 Subject: [PATCH 5/9] minor --- .../sqllogictest/test_files/groupby.slt | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 9d6564bda359..01a6ae139e12 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3863,32 +3863,32 @@ WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv' query TIIII -SELECT c1, count(distinct c2), sum(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +SELECT c1, count(distinct c2), min(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; ---- -a 5 15 21 32064 -b 5 15 19 25286 -c 5 15 21 29106 -d 5 15 18 31106 -e 5 15 21 32514 +a 5 1 21 32064 +b 5 1 19 25286 +c 5 1 21 29106 +d 5 1 18 31106 +e 5 1 21 32514 query TT -EXPLAIN SELECT c1, count(distinct c2), count(c3) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +EXPLAIN SELECT c1, count(distinct c2), min(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; ---- logical_plan Sort: aggregate_test_100.c1 ASC NULLS LAST ---Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT aggregate_test_100.c2), SUM(alias2) AS COUNT(aggregate_test_100.c3)]] -----Aggregate: groupBy=[[aggregate_test_100.c1, aggregate_test_100.c2 AS alias1]], aggr=[[COUNT(aggregate_test_100.c3) AS alias2]] -------TableScan: aggregate_test_100 projection=[c1, c2, c3] +--Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT aggregate_test_100.c2), MIN(alias1) AS MIN(DISTINCT aggregate_test_100.c2), SUM(alias2) AS COUNT(aggregate_test_100.c3), MAX(alias3) AS MAX(aggregate_test_100.c4)]] +----Aggregate: groupBy=[[aggregate_test_100.c1, aggregate_test_100.c2 AS alias1]], aggr=[[COUNT(aggregate_test_100.c3) AS alias2, MAX(aggregate_test_100.c4) AS alias3]] +------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4] physical_plan SortPreservingMergeExec: [c1@0 ASC NULLS LAST] --SortExec: expr=[c1@0 ASC NULLS LAST] -----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3)] +----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] ------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 -----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3)] -------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[alias2] +----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] +------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[alias2, alias3] --------------CoalesceBatchesExec: target_batch_size=2 ----------------RepartitionExec: partitioning=Hash([c1@0, alias1@1], 8), input_partitions=8 -------------------AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as alias1], aggr=[alias2] +------------------AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as alias1], aggr=[alias2, alias3] --------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true +----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4], has_header=true From eb6339cfd42ebaed770ebf1d9cc108cdfc96b720 Mon Sep 17 00:00:00 2001 From: hhj Date: Fri, 10 Nov 2023 22:48:21 +0800 Subject: [PATCH 6/9] fmt --- datafusion/optimizer/src/single_distinct_to_groupby.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 1c521fd20167..e68c604fb364 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -80,7 +80,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { }) = expr { match filter { - Some(_) => return Ok(false), + Some(_) => return Ok(false), None => { aggregate_count += 1; if *distinct { @@ -496,11 +496,7 @@ mod tests { let plan = LogicalPlanBuilder::from(table_scan) .aggregate( vec![col("a")], - vec![ - count(col("c")), - max(col("c")), - count_distinct(col("b")), - ], + vec![count(col("c")), max(col("c")), count_distinct(col("b"))], )? .build()?; // Should work From fe67f88857ca5fac34572ccf5ad541f23af2aa96 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 14 Nov 2023 22:26:51 +0800 Subject: [PATCH 7/9] remove count support --- .../src/single_distinct_to_groupby.rs | 57 ++++++------------- .../sqllogictest/test_files/groupby.slt | 22 +++---- 2 files changed, 28 insertions(+), 51 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index e68c604fb364..65ce950a0274 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -24,10 +24,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - aggregate_function::{ - self, - AggregateFunction::{Count, Max, Min, Sum}, - }, + aggregate_function::AggregateFunction::{Max, Min, Sum}, col, expr::AggregateFunction, logical_plan::{Aggregate, LogicalPlan}, @@ -39,14 +36,14 @@ use hashbrown::HashSet; /// single distinct to group by optimizer rule /// ```text /// Before: -/// SELECT a, COUNT(DINSTINCT b), COUNT(c) +/// SELECT a, COUNT(DINSTINCT b), SUM(c) /// FROM t /// GROUP BY a /// /// After: /// SELECT a, COUNT(alias1), SUM(alias2) /// FROM ( -/// SELECT a, b as alias1, COUNT(c) as alias2 +/// SELECT a, b as alias1, SUM(c) as alias2 /// FROM t /// GROUP BY a, b /// ) @@ -87,7 +84,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { for e in args { fields_set.insert(e.canonical_name()); } - } else if !matches!(fun, Count | Sum | Min | Max) { + } else if !matches!(fun, Sum | Min | Max) { return Ok(false); } } @@ -105,24 +102,6 @@ fn contains_grouping_set(expr: &[Expr]) -> bool { matches!(expr.first(), Some(Expr::GroupingSet(_))) } -/// Rewrite the aggregate function to two aggregate functions. -fn aggregate_function_rewrite( - fun: &aggregate_function::AggregateFunction, -) -> ( - aggregate_function::AggregateFunction, - aggregate_function::AggregateFunction, -) { - match fun { - Count => (Sum, Count), - Sum => (Sum, Sum), - Min => (Min, Min), - Max => (Max, Max), - _ => panic!( - "Not supported aggregate function in single distinct to group by optimization rule" - ), - } -} - impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, @@ -211,11 +190,9 @@ impl OptimizerRule for SingleDistinctToGroupBy { if !(*distinct) { index += 1; let alias_str = format!("alias{}", index); - let (out_fun, inner_fun) = - aggregate_function_rewrite(fun); let inner_expr = Expr::AggregateFunction(AggregateFunction::new( - inner_fun, + fun.clone(), args.clone(), false, None, @@ -224,7 +201,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { .alias(&alias_str); inner_aggr_exprs.push(inner_expr); Ok(Expr::AggregateFunction(AggregateFunction::new( - out_fun, + fun.clone(), vec![col(&alias_str)], false, None, @@ -282,8 +259,8 @@ mod tests { use datafusion_expr::expr; use datafusion_expr::expr::GroupingSet; use datafusion_expr::{ - col, count, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, - AggregateFunction, + col, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, + AggregateFunction,sum, }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { @@ -469,7 +446,7 @@ mod tests { .aggregate( vec![col("a")], vec![ - count(col("c")), + sum(col("c")), count_distinct(col("b")), Expr::AggregateFunction(expr::AggregateFunction::new( AggregateFunction::Max, @@ -482,8 +459,8 @@ mod tests { )? .build()?; // Should work - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS COUNT(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, COUNT(test.c):Int64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\ - \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:Int64;N]\ + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS SUM(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, SUM(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[SUM(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) @@ -496,12 +473,12 @@ mod tests { let plan = LogicalPlanBuilder::from(table_scan) .aggregate( vec![col("a")], - vec![count(col("c")), max(col("c")), count_distinct(col("b"))], + vec![sum(col("c")), max(col("c")), count_distinct(col("b"))], )? .build()?; // Should work - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS COUNT(test.c), MAX(alias3) AS MAX(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(test.c):Int64;N, MAX(test.c):UInt32;N, COUNT(DISTINCT test.b):Int64;N]\ - \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2, MAX(test.c) AS alias3]] [a:UInt32, alias1:UInt32, alias2:Int64;N, alias3:UInt32;N]\ + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(alias2) AS SUM(test.c), MAX(alias3) AS MAX(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b)]] [a:UInt32, SUM(test.c):UInt64;N, MAX(test.c):UInt32;N, COUNT(DISTINCT test.b):Int64;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[SUM(test.c) AS alias2, MAX(test.c) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) @@ -514,13 +491,13 @@ mod tests { let plan = LogicalPlanBuilder::from(table_scan) .aggregate( vec![col("a")], - vec![count_distinct(col("b")), count(col("c"))], + vec![count_distinct(col("b")), sum(col("c"))], )? .build()?; // Should work - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b), SUM(alias2) AS COUNT(test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, COUNT(test.c):Int64;N]\ - \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[COUNT(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:Int64;N]\ + let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b), SUM(alias2) AS SUM(test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, SUM(test.c):UInt64;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[SUM(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 01a6ae139e12..5bee97231ae0 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3863,29 +3863,29 @@ WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv' query TIIII -SELECT c1, count(distinct c2), min(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +SELECT c1, count(distinct c2), min(distinct c2), min(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; ---- -a 5 1 21 32064 -b 5 1 19 25286 -c 5 1 21 29106 -d 5 1 18 31106 -e 5 1 21 32514 +a 5 1 -101 32064 +b 5 1 -117 25286 +c 5 1 -117 29106 +d 5 1 -99 31106 +e 5 1 -95 32514 query TT -EXPLAIN SELECT c1, count(distinct c2), min(distinct c2), count(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; +EXPLAIN SELECT c1, count(distinct c2), min(distinct c2), sum(c3), max(c4) FROM aggregate_test_100 GROUP BY c1 ORDER BY c1; ---- logical_plan Sort: aggregate_test_100.c1 ASC NULLS LAST ---Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT aggregate_test_100.c2), MIN(alias1) AS MIN(DISTINCT aggregate_test_100.c2), SUM(alias2) AS COUNT(aggregate_test_100.c3), MAX(alias3) AS MAX(aggregate_test_100.c4)]] -----Aggregate: groupBy=[[aggregate_test_100.c1, aggregate_test_100.c2 AS alias1]], aggr=[[COUNT(aggregate_test_100.c3) AS alias2, MAX(aggregate_test_100.c4) AS alias3]] +--Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT aggregate_test_100.c2), MIN(alias1) AS MIN(DISTINCT aggregate_test_100.c2), SUM(alias2) AS SUM(aggregate_test_100.c3), MAX(alias3) AS MAX(aggregate_test_100.c4)]] +----Aggregate: groupBy=[[aggregate_test_100.c1, aggregate_test_100.c2 AS alias1]], aggr=[[SUM(CAST(aggregate_test_100.c3 AS Int64)) AS alias2, MAX(aggregate_test_100.c4) AS alias3]] ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4] physical_plan SortPreservingMergeExec: [c1@0 ASC NULLS LAST] --SortExec: expr=[c1@0 ASC NULLS LAST] -----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] +----AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), SUM(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] ------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 -----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), COUNT(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] +----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(DISTINCT aggregate_test_100.c2), MIN(DISTINCT aggregate_test_100.c2), SUM(aggregate_test_100.c3), MAX(aggregate_test_100.c4)] ------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[alias2, alias3] --------------CoalesceBatchesExec: target_batch_size=2 ----------------RepartitionExec: partitioning=Hash([c1@0, alias1@1], 8), input_partitions=8 From 003b08e7360ed760e2f5c47fc2ddea0d991e77e5 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 14 Nov 2023 22:29:15 +0800 Subject: [PATCH 8/9] fmt --- datafusion/optimizer/src/single_distinct_to_groupby.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 65ce950a0274..3565cf07c715 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -259,8 +259,8 @@ mod tests { use datafusion_expr::expr; use datafusion_expr::expr::GroupingSet; use datafusion_expr::{ - col, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, - AggregateFunction,sum, + col, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, sum, + AggregateFunction, }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { From d3e3bbe517c0a972c61e2b8309f4df5b9113c45b Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 14 Nov 2023 22:59:12 +0800 Subject: [PATCH 9/9] fix ci --- datafusion/substrait/tests/cases/roundtrip_logical_plan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 8c5bb2eace43..a89e32fd721d 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -288,8 +288,8 @@ async fn select_distinct_aggregate_two_fields() -> Result<()> { #[tokio::test] async fn select_distinct_aggregate_and_no_distinct_aggregate() -> Result<()> { test_alias( - "SELECT a, COUNT(DISTINCT b), COUNT(c) FROM data GROUP by a", - "SELECT a, COUNT(b), SUM(\"COUNT(data.c)\") FROM (SELECT a, b, COUNT(c) FROM data GROUP BY a, b) GROUP BY a", + "SELECT a, COUNT(DISTINCT b), SUM(e) FROM data GROUP by a", + "SELECT a, COUNT(b), SUM(\"SUM(data.e)\") FROM (SELECT a, b, SUM(e) FROM data GROUP BY a, b) GROUP BY a", ) .await }