Skip to content

Commit ac0f054

Browse files
committed
add aggregate
Signed-off-by: jayzhan211 <[email protected]>
1 parent 147edb9 commit ac0f054

File tree

12 files changed

+81
-4
lines changed

12 files changed

+81
-4
lines changed

datafusion/core/tests/sqllogictests/test_files/array.slt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2332,6 +2332,12 @@ select array_concat(column1, [7]) from arrays_values_v2;
23322332

23332333
# array aggregate function
23342334

2335+
## array aggregate
2336+
query I
2337+
select array_aggregate([1, 3, 5, 7], 'sum');
2338+
----
2339+
16
2340+
23352341
## array sum
23362342
query IRI
23372343
select array_sum([1, 3, 5, 7]),
@@ -2340,6 +2346,10 @@ select array_sum([1, 3, 5, 7]),
23402346
----
23412347
16 6.6 23
23422348

2349+
# TODO: Support nulls in array.
2350+
# query error DataFusion error: This feature is not implemented: Arrays with different types are not supported: \{Int64, Null\}
2351+
# select array_sum([1, null, 3, null]);
2352+
23432353
query ??
23442354
select column1, column6 from arrays_values_without_nulls;
23452355
----

datafusion/expr/src/built_in_function.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ pub enum BuiltinScalarFunction {
117117
Cot,
118118

119119
// array functions
120+
/// array_aggregate
121+
ArrayAggregate,
120122
/// array_append
121123
ArrayAppend,
122124
/// array_concat
@@ -350,6 +352,7 @@ impl BuiltinScalarFunction {
350352
BuiltinScalarFunction::Tanh => Volatility::Immutable,
351353
BuiltinScalarFunction::Trunc => Volatility::Immutable,
352354
BuiltinScalarFunction::ArrayAppend => Volatility::Immutable,
355+
BuiltinScalarFunction::ArrayAggregate => Volatility::Immutable,
353356
BuiltinScalarFunction::ArrayConcat => Volatility::Immutable,
354357
BuiltinScalarFunction::ArrayHasAll => Volatility::Immutable,
355358
BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable,
@@ -503,6 +506,10 @@ impl BuiltinScalarFunction {
503506
// Some built-in functions' return type depends on the incoming type.
504507
match self {
505508
BuiltinScalarFunction::ArrayAppend => Ok(input_expr_types[0].clone()),
509+
BuiltinScalarFunction::ArrayAggregate => {
510+
// TODO: Fix this
511+
Ok(Int64)
512+
}
506513
BuiltinScalarFunction::ArrayConcat => {
507514
let mut expr_type = Null;
508515
let mut max_dims = 0;
@@ -839,15 +846,16 @@ impl BuiltinScalarFunction {
839846

840847
// for now, the list is small, as we do not have many built-in functions.
841848
match self {
842-
BuiltinScalarFunction::ArrayAppend => Signature::any(2, self.volatility()),
843849
BuiltinScalarFunction::ArrayConcat => {
844850
Signature::variadic_any(self.volatility())
845851
}
846-
BuiltinScalarFunction::ArrayHasAll
852+
BuiltinScalarFunction::ArrayAggregate
853+
| BuiltinScalarFunction::ArrayAppend
854+
| BuiltinScalarFunction::ArrayElement
855+
| BuiltinScalarFunction::ArrayHasAll
847856
| BuiltinScalarFunction::ArrayHasAny
848857
| BuiltinScalarFunction::ArrayHas => Signature::any(2, self.volatility()),
849858
BuiltinScalarFunction::ArrayDims => Signature::any(1, self.volatility()),
850-
BuiltinScalarFunction::ArrayElement => Signature::any(2, self.volatility()),
851859
BuiltinScalarFunction::ArrayLength => {
852860
Signature::variadic_any(self.volatility())
853861
}
@@ -1316,6 +1324,12 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
13161324
BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"],
13171325

13181326
// array functions
1327+
BuiltinScalarFunction::ArrayAggregate => &[
1328+
"array_aggregate",
1329+
"list_aggregate",
1330+
"array_aggr",
1331+
"list_aggr",
1332+
],
13191333
BuiltinScalarFunction::ArrayAppend => &[
13201334
"array_append",
13211335
"list_append",

datafusion/expr/src/expr_fn.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,12 @@ scalar_expr!(
545545
array element,
546546
"appends an element to the end of an array."
547547
);
548+
scalar_expr!(
549+
ArrayAggregate,
550+
array_aggregate,
551+
array name,
552+
"allows the execution of arbitrary existing aggregate functions on the elements of a list"
553+
);
548554
nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays.");
549555
scalar_expr!(
550556
ArrayHas,

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,20 @@ macro_rules! array_sum_internal(
21172117
}}
21182118
);
21192119

2120-
/// Array_sum SQL function
2120+
/// array_aggregate SQL function
2121+
pub fn array_aggregate(args: &[ArrayRef]) -> Result<ArrayRef> {
2122+
assert_eq!(args.len(), 2);
2123+
let func_name = args[1].as_string::<i32>().value(0);
2124+
let args = &args[0..1];
2125+
match func_name {
2126+
"sum" => array_sum(args),
2127+
_ => Err(DataFusionError::NotImplemented(format!(
2128+
"array_aggregate does not support function '{func_name}'"
2129+
))),
2130+
}
2131+
}
2132+
2133+
/// array_sum SQL function
21212134
pub fn array_sum(args: &[ArrayRef]) -> Result<ArrayRef> {
21222135
assert_eq!(args.len(), 1);
21232136

datafusion/physical-expr/src/functions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ pub fn create_physical_fun(
416416
BuiltinScalarFunction::ArrayAppend => {
417417
Arc::new(|args| make_scalar_function(array_expressions::array_append)(args))
418418
}
419+
BuiltinScalarFunction::ArrayAggregate => Arc::new(|args| {
420+
make_scalar_function(array_expressions::array_aggregate)(args)
421+
}),
419422
BuiltinScalarFunction::ArrayConcat => {
420423
Arc::new(|args| make_scalar_function(array_expressions::array_concat)(args))
421424
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ enum ScalarFunction {
595595
ArrayReplaceAll = 110;
596596
Nanvl = 111;
597597
ArraySum = 113;
598+
ArrayAggregate = 114;
598599
}
599600

600601
message ScalarFunctionNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
450450
ScalarFunction::Ltrim => Self::Ltrim,
451451
ScalarFunction::Rtrim => Self::Rtrim,
452452
ScalarFunction::ToTimestamp => Self::ToTimestamp,
453+
ScalarFunction::ArrayAggregate => Self::ArrayAggregate,
453454
ScalarFunction::ArrayAppend => Self::ArrayAppend,
454455
ScalarFunction::ArrayConcat => Self::ArrayConcat,
455456
ScalarFunction::ArrayHasAll => Self::ArrayHasAll,

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
14491449
BuiltinScalarFunction::Ltrim => Self::Ltrim,
14501450
BuiltinScalarFunction::Rtrim => Self::Rtrim,
14511451
BuiltinScalarFunction::ToTimestamp => Self::ToTimestamp,
1452+
BuiltinScalarFunction::ArrayAggregate => Self::ArrayAggregate,
14521453
BuiltinScalarFunction::ArrayAppend => Self::ArrayAppend,
14531454
BuiltinScalarFunction::ArrayConcat => Self::ArrayConcat,
14541455
BuiltinScalarFunction::ArrayHasAll => Self::ArrayHasAll,

0 commit comments

Comments
 (0)