Skip to content

Commit 478c77e

Browse files
jayzhan211alamb
authored andcommitted
Remove builtin count (apache#10893)
* rm expr fn Signed-off-by: jayzhan211 <[email protected]> * rm function Signed-off-by: jayzhan211 <[email protected]> * fix query and fmt Signed-off-by: jayzhan211 <[email protected]> * fix example Signed-off-by: jayzhan211 <[email protected]> * Update datafusion/expr/src/test/function_stub.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent da062ac commit 478c77e

File tree

28 files changed

+200
-219
lines changed

28 files changed

+200
-219
lines changed

datafusion/expr/src/aggregate_function.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ use strum_macros::EnumIter;
3333
// https://datafusion.apache.org/contributor-guide/index.html#how-to-add-a-new-aggregate-function
3434
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
3535
pub enum AggregateFunction {
36-
/// Count
37-
Count,
3836
/// Minimum
3937
Min,
4038
/// Maximum
@@ -89,7 +87,6 @@ impl AggregateFunction {
8987
pub fn name(&self) -> &str {
9088
use AggregateFunction::*;
9189
match self {
92-
Count => "COUNT",
9390
Min => "MIN",
9491
Max => "MAX",
9592
Avg => "AVG",
@@ -135,7 +132,6 @@ impl FromStr for AggregateFunction {
135132
"bit_xor" => AggregateFunction::BitXor,
136133
"bool_and" => AggregateFunction::BoolAnd,
137134
"bool_or" => AggregateFunction::BoolOr,
138-
"count" => AggregateFunction::Count,
139135
"max" => AggregateFunction::Max,
140136
"mean" => AggregateFunction::Avg,
141137
"min" => AggregateFunction::Min,
@@ -190,7 +186,6 @@ impl AggregateFunction {
190186
})?;
191187

192188
match self {
193-
AggregateFunction::Count => Ok(DataType::Int64),
194189
AggregateFunction::Max | AggregateFunction::Min => {
195190
// For min and max agg function, the returned type is same as input type.
196191
// The coerced_data_types is same with input_types.
@@ -249,7 +244,6 @@ impl AggregateFunction {
249244
pub fn signature(&self) -> Signature {
250245
// note: the physical expression must accept the type returned by this function or the execution panics.
251246
match self {
252-
AggregateFunction::Count => Signature::variadic_any(Volatility::Immutable),
253247
AggregateFunction::Grouping | AggregateFunction::ArrayAgg => {
254248
Signature::any(1, Volatility::Immutable)
255249
}

datafusion/expr/src/expr.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2135,18 +2135,6 @@ mod test {
21352135

21362136
use super::*;
21372137

2138-
#[test]
2139-
fn test_count_return_type() -> Result<()> {
2140-
let fun = find_df_window_func("count").unwrap();
2141-
let observed = fun.return_type(&[DataType::Utf8])?;
2142-
assert_eq!(DataType::Int64, observed);
2143-
2144-
let observed = fun.return_type(&[DataType::UInt64])?;
2145-
assert_eq!(DataType::Int64, observed);
2146-
2147-
Ok(())
2148-
}
2149-
21502138
#[test]
21512139
fn test_first_value_return_type() -> Result<()> {
21522140
let fun = find_df_window_func("first_value").unwrap();
@@ -2250,7 +2238,6 @@ mod test {
22502238
"nth_value",
22512239
"min",
22522240
"max",
2253-
"count",
22542241
"avg",
22552242
];
22562243
for name in names {

datafusion/expr/src/expr_fn.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -192,19 +192,6 @@ pub fn avg(expr: Expr) -> Expr {
192192
))
193193
}
194194

195-
/// Create an expression to represent the count() aggregate function
196-
// TODO: Remove this and use `expr_fn::count` instead
197-
pub fn count(expr: Expr) -> Expr {
198-
Expr::AggregateFunction(AggregateFunction::new(
199-
aggregate_function::AggregateFunction::Count,
200-
vec![expr],
201-
false,
202-
None,
203-
None,
204-
None,
205-
))
206-
}
207-
208195
/// Return a new expression with bitwise AND
209196
pub fn bitwise_and(left: Expr, right: Expr) -> Expr {
210197
Expr::BinaryExpr(BinaryExpr::new(
@@ -250,19 +237,6 @@ pub fn bitwise_shift_left(left: Expr, right: Expr) -> Expr {
250237
))
251238
}
252239

253-
/// Create an expression to represent the count(distinct) aggregate function
254-
// TODO: Remove this and use `expr_fn::count_distinct` instead
255-
pub fn count_distinct(expr: Expr) -> Expr {
256-
Expr::AggregateFunction(AggregateFunction::new(
257-
aggregate_function::AggregateFunction::Count,
258-
vec![expr],
259-
true,
260-
None,
261-
None,
262-
None,
263-
))
264-
}
265-
266240
/// Create an in_list expression
267241
pub fn in_list(expr: Expr, list: Vec<Expr>, negated: bool) -> Expr {
268242
Expr::InList(InList::new(Box::new(expr), list, negated))

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2965,11 +2965,13 @@ mod tests {
29652965
use super::*;
29662966
use crate::builder::LogicalTableSource;
29672967
use crate::logical_plan::table_scan;
2968-
use crate::{col, count, exists, in_subquery, lit, placeholder, GroupingSet};
2968+
use crate::{col, exists, in_subquery, lit, placeholder, GroupingSet};
29692969

29702970
use datafusion_common::tree_node::TreeNodeVisitor;
29712971
use datafusion_common::{not_impl_err, Constraint, ScalarValue};
29722972

2973+
use crate::test::function_stub::count;
2974+
29732975
fn employee_schema() -> Schema {
29742976
Schema::new(vec![
29752977
Field::new("id", DataType::Int32, false),

datafusion/expr/src/test/function_stub.rs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
use arrow::datatypes::{
3232
DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
3333
};
34-
use datafusion_common::{exec_err, Result};
34+
use datafusion_common::{exec_err, not_impl_err, Result};
3535

3636
macro_rules! create_func {
3737
($UDAF:ty, $AGGREGATE_UDF_FN:ident) => {
@@ -69,6 +69,19 @@ pub fn sum(expr: Expr) -> Expr {
6969
))
7070
}
7171

72+
create_func!(Count, count_udaf);
73+
74+
pub fn count(expr: Expr) -> Expr {
75+
Expr::AggregateFunction(AggregateFunction::new_udf(
76+
count_udaf(),
77+
vec![expr],
78+
false,
79+
None,
80+
None,
81+
None,
82+
))
83+
}
84+
7285
/// Stub `sum` used for optimizer testing
7386
#[derive(Debug)]
7487
pub struct Sum {
@@ -189,3 +202,74 @@ impl AggregateUDFImpl for Sum {
189202
AggregateOrderSensitivity::Insensitive
190203
}
191204
}
205+
206+
/// Testing stub implementation of COUNT aggregate
207+
pub struct Count {
208+
signature: Signature,
209+
aliases: Vec<String>,
210+
}
211+
212+
impl std::fmt::Debug for Count {
213+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
214+
f.debug_struct("Count")
215+
.field("name", &self.name())
216+
.field("signature", &self.signature)
217+
.finish()
218+
}
219+
}
220+
221+
impl Default for Count {
222+
fn default() -> Self {
223+
Self::new()
224+
}
225+
}
226+
227+
impl Count {
228+
pub fn new() -> Self {
229+
Self {
230+
aliases: vec!["count".to_string()],
231+
signature: Signature::variadic_any(Volatility::Immutable),
232+
}
233+
}
234+
}
235+
236+
impl AggregateUDFImpl for Count {
237+
fn as_any(&self) -> &dyn std::any::Any {
238+
self
239+
}
240+
241+
fn name(&self) -> &str {
242+
"COUNT"
243+
}
244+
245+
fn signature(&self) -> &Signature {
246+
&self.signature
247+
}
248+
249+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
250+
Ok(DataType::Int64)
251+
}
252+
253+
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
254+
not_impl_err!("no impl for stub")
255+
}
256+
257+
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
258+
not_impl_err!("no impl for stub")
259+
}
260+
261+
fn aliases(&self) -> &[String] {
262+
&self.aliases
263+
}
264+
265+
fn create_groups_accumulator(
266+
&self,
267+
_args: AccumulatorArgs,
268+
) -> Result<Box<dyn GroupsAccumulator>> {
269+
not_impl_err!("no impl for stub")
270+
}
271+
272+
fn reverse_expr(&self) -> ReversedUDAF {
273+
ReversedUDAF::Identical
274+
}
275+
}

datafusion/expr/src/type_coercion/aggregates.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ pub fn coerce_types(
9696
check_arg_count(agg_fun.name(), input_types, &signature.type_signature)?;
9797

9898
match agg_fun {
99-
AggregateFunction::Count => Ok(input_types.to_vec()),
10099
AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
101100
AggregateFunction::Min | AggregateFunction::Max => {
102101
// min and max support the dictionary data type
@@ -525,7 +524,6 @@ mod tests {
525524
// test count, array_agg, approx_distinct, min, max.
526525
// the coerced types is same with input types
527526
let funs = vec![
528-
AggregateFunction::Count,
529527
AggregateFunction::ArrayAgg,
530528
AggregateFunction::Min,
531529
AggregateFunction::Max,

datafusion/optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,6 @@ regex-syntax = "0.8.0"
5656
[dev-dependencies]
5757
arrow-buffer = { workspace = true }
5858
ctor = { workspace = true }
59+
datafusion-functions-aggregate = { workspace = true }
5960
datafusion-sql = { workspace = true }
6061
env_logger = { workspace = true }

datafusion/optimizer/src/analyzer/count_wildcard_rule.rs

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use datafusion_expr::expr::{
2525
AggregateFunction, AggregateFunctionDefinition, WindowFunction,
2626
};
2727
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
28-
use datafusion_expr::{
29-
aggregate_function, lit, Expr, LogicalPlan, WindowFunctionDefinition,
30-
};
28+
use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition};
3129

3230
/// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`.
3331
///
@@ -56,37 +54,19 @@ fn is_wildcard(expr: &Expr) -> bool {
5654
}
5755

5856
fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool {
59-
match aggregate_function {
57+
matches!(aggregate_function,
6058
AggregateFunction {
6159
func_def: AggregateFunctionDefinition::UDF(udf),
6260
args,
6361
..
64-
} if udf.name() == "COUNT" && args.len() == 1 && is_wildcard(&args[0]) => true,
65-
AggregateFunction {
66-
func_def:
67-
AggregateFunctionDefinition::BuiltIn(
68-
datafusion_expr::aggregate_function::AggregateFunction::Count,
69-
),
70-
args,
71-
..
72-
} if args.len() == 1 && is_wildcard(&args[0]) => true,
73-
_ => false,
74-
}
62+
} if udf.name() == "COUNT" && args.len() == 1 && is_wildcard(&args[0]))
7563
}
7664

7765
fn is_count_star_window_aggregate(window_function: &WindowFunction) -> bool {
7866
let args = &window_function.args;
79-
match window_function.fun {
80-
WindowFunctionDefinition::AggregateFunction(
81-
aggregate_function::AggregateFunction::Count,
82-
) if args.len() == 1 && is_wildcard(&args[0]) => true,
67+
matches!(window_function.fun,
8368
WindowFunctionDefinition::AggregateUDF(ref udaf)
84-
if udaf.name() == "COUNT" && args.len() == 1 && is_wildcard(&args[0]) =>
85-
{
86-
true
87-
}
88-
_ => false,
89-
}
69+
if udaf.name() == "COUNT" && args.len() == 1 && is_wildcard(&args[0]))
9070
}
9171

9272
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
@@ -121,14 +101,16 @@ mod tests {
121101
use arrow::datatypes::DataType;
122102
use datafusion_common::ScalarValue;
123103
use datafusion_expr::expr::Sort;
124-
use datafusion_expr::test::function_stub::sum;
125104
use datafusion_expr::{
126-
col, count, exists, expr, in_subquery, logical_plan::LogicalPlanBuilder, max,
127-
out_ref_col, scalar_subquery, wildcard, AggregateFunction, WindowFrame,
128-
WindowFrameBound, WindowFrameUnits,
105+
col, exists, expr, in_subquery, logical_plan::LogicalPlanBuilder, max,
106+
out_ref_col, scalar_subquery, wildcard, WindowFrame, WindowFrameBound,
107+
WindowFrameUnits,
129108
};
109+
use datafusion_functions_aggregate::count::count_udaf;
130110
use std::sync::Arc;
131111

112+
use datafusion_functions_aggregate::expr_fn::{count, sum};
113+
132114
fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
133115
assert_analyzed_plan_eq_display_indent(
134116
Arc::new(CountWildcardRule::new()),
@@ -239,7 +221,7 @@ mod tests {
239221

240222
let plan = LogicalPlanBuilder::from(table_scan)
241223
.window(vec![Expr::WindowFunction(expr::WindowFunction::new(
242-
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Count),
224+
WindowFunctionDefinition::AggregateUDF(count_udaf()),
243225
vec![wildcard()],
244226
vec![],
245227
vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))],

datafusion/optimizer/src/decorrelate.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,8 @@ fn agg_exprs_evaluation_result_on_empty_batch(
432432
Expr::AggregateFunction(expr::AggregateFunction {
433433
func_def, ..
434434
}) => match func_def {
435-
AggregateFunctionDefinition::BuiltIn(fun) => {
436-
if matches!(fun, datafusion_expr::AggregateFunction::Count) {
437-
Transformed::yes(Expr::Literal(ScalarValue::Int64(Some(
438-
0,
439-
))))
440-
} else {
441-
Transformed::yes(Expr::Literal(ScalarValue::Null))
442-
}
435+
AggregateFunctionDefinition::BuiltIn(_fun) => {
436+
Transformed::yes(Expr::Literal(ScalarValue::Null))
443437
}
444438
AggregateFunctionDefinition::UDF(fun) => {
445439
if fun.name() == "COUNT" {

datafusion/optimizer/src/eliminate_group_by_constant.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ mod tests {
129129
use datafusion_common::Result;
130130
use datafusion_expr::expr::ScalarFunction;
131131
use datafusion_expr::{
132-
col, count, lit, ColumnarValue, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl,
133-
Signature, TypeSignature,
132+
col, lit, ColumnarValue, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature,
133+
TypeSignature,
134134
};
135135

136+
use datafusion_functions_aggregate::expr_fn::count;
137+
136138
use std::sync::Arc;
137139

138140
#[derive(Debug)]

0 commit comments

Comments
 (0)