diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 56dc75345fda..66cdeb575a15 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -49,7 +49,6 @@ use datafusion_expr::{ SimpleAggregateUDF, }; use datafusion_physical_expr::expressions::AvgAccumulator; - /// Test to show the contents of the setup #[tokio::test] async fn test_setup() { diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 6227df814f91..e3d2e6555d5c 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -41,8 +41,6 @@ pub enum AggregateFunction { Max, /// Average Avg, - /// Approximate distinct function - ApproxDistinct, /// Aggregation into an array ArrayAgg, /// N'th value in a group according to some ordering @@ -95,7 +93,6 @@ impl AggregateFunction { Min => "MIN", Max => "MAX", Avg => "AVG", - ApproxDistinct => "APPROX_DISTINCT", ArrayAgg => "ARRAY_AGG", NthValue => "NTH_VALUE", Correlation => "CORR", @@ -157,7 +154,6 @@ impl FromStr for AggregateFunction { "regr_syy" => AggregateFunction::RegrSYY, "regr_sxy" => AggregateFunction::RegrSXY, // approximate - "approx_distinct" => AggregateFunction::ApproxDistinct, "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont, "approx_percentile_cont_with_weight" => { AggregateFunction::ApproxPercentileContWithWeight @@ -194,9 +190,7 @@ impl AggregateFunction { })?; match self { - AggregateFunction::Count | AggregateFunction::ApproxDistinct => { - Ok(DataType::Int64) - } + AggregateFunction::Count => Ok(DataType::Int64), AggregateFunction::Max | AggregateFunction::Min => { // For min and max agg function, the returned type is same as input type. // The coerced_data_types is same with input_types. @@ -256,9 +250,9 @@ impl AggregateFunction { // note: the physical expression must accept the type returned by this function or the execution panics. match self { AggregateFunction::Count => Signature::variadic_any(Volatility::Immutable), - AggregateFunction::ApproxDistinct - | AggregateFunction::Grouping - | AggregateFunction::ArrayAgg => Signature::any(1, Volatility::Immutable), + AggregateFunction::Grouping | AggregateFunction::ArrayAgg => { + Signature::any(1, Volatility::Immutable) + } AggregateFunction::Min | AggregateFunction::Max => { let valid = STRINGS .iter() diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 5626d343a6cb..420312050870 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -266,24 +266,6 @@ pub fn in_list(expr: Expr, list: Vec, negated: bool) -> Expr { Expr::InList(InList::new(Box::new(expr), list, negated)) } -/// Returns the approximate number of distinct input values. -/// This function provides an approximation of count(DISTINCT x). -/// Zero is returned if all input values are null. -/// This function should produce a standard error of 0.81%, -/// which is the standard deviation of the (approximately normal) -/// error distribution over all possible sets. -/// It does not guarantee an upper bound on the error for any specific input set. -pub fn approx_distinct(expr: Expr) -> Expr { - Expr::AggregateFunction(AggregateFunction::new( - aggregate_function::AggregateFunction::ApproxDistinct, - vec![expr], - false, - None, - None, - None, - )) -} - /// Calculate an approximation of the specified `percentile` for `expr`. pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr { Expr::AggregateFunction(AggregateFunction::new( diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index efd3c9f371ef..ab7deaff9885 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -96,9 +96,7 @@ pub fn coerce_types( check_arg_count(agg_fun.name(), input_types, &signature.type_signature)?; match agg_fun { - AggregateFunction::Count | AggregateFunction::ApproxDistinct => { - Ok(input_types.to_vec()) - } + AggregateFunction::Count => Ok(input_types.to_vec()), AggregateFunction::ArrayAgg => Ok(input_types.to_vec()), AggregateFunction::Min | AggregateFunction::Max => { // min and max support the dictionary data type @@ -529,7 +527,6 @@ mod tests { let funs = vec![ AggregateFunction::Count, AggregateFunction::ArrayAgg, - AggregateFunction::ApproxDistinct, AggregateFunction::Min, AggregateFunction::Max, ]; diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs similarity index 80% rename from datafusion/physical-expr/src/aggregate/approx_distinct.rs rename to datafusion/functions-aggregate/src/approx_distinct.rs index c0bce3ac2774..6f1e97a16380 100644 --- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -17,134 +17,85 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use super::hyperloglog::HyperLogLog; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use crate::{AggregateExpr, PhysicalExpr}; +use crate::hyperloglog::HyperLogLog; +use arrow::array::BinaryArray; use arrow::array::{ - ArrayRef, BinaryArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, - PrimitiveArray, + GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type, - UInt16Type, UInt32Type, UInt64Type, UInt8Type, + ArrowPrimitiveType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, }; +use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; +use datafusion_common::ScalarValue; use datafusion_common::{ - downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, + downcast_value, internal_err, not_impl_err, DataFusionError, Result, }; -use datafusion_expr::Accumulator; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::hash::Hash; use std::marker::PhantomData; -use std::sync::Arc; +make_udaf_expr_and_func!( + ApproxDistinct, + approx_distinct, + expression, + "approximate number of distinct input values", + approx_distinct_udaf +); -/// APPROX_DISTINCT aggregate expression -#[derive(Debug)] -pub struct ApproxDistinct { - name: String, - input_data_type: DataType, - expr: Arc, -} - -impl ApproxDistinct { - /// Create a new ApproxDistinct aggregate function. - pub fn new( - expr: Arc, - name: impl Into, - input_data_type: DataType, - ) -> Self { - Self { - name: name.into(), - input_data_type, - expr, - } +impl From<&HyperLogLog> for ScalarValue { + fn from(v: &HyperLogLog) -> ScalarValue { + let values = v.as_ref().to_vec(); + ScalarValue::Binary(Some(values)) } } -impl AggregateExpr for ApproxDistinct { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new(&self.name, DataType::UInt64, false)) - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "hll_registers"), - DataType::Binary, - false, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn create_accumulator(&self) -> Result> { - let accumulator: Box = match &self.input_data_type { - // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL - // TODO support for boolean (trivial case) - // https://github.com/apache/datafusion/issues/1109 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Utf8 => Box::new(StringHLLAccumulator::::new()), - DataType::LargeUtf8 => Box::new(StringHLLAccumulator::::new()), - DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), - DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), - other => { - return not_impl_err!( - "Support for 'approx_distinct' for data type {other} is not implemented" +impl TryFrom<&[u8]> for HyperLogLog { + type Error = DataFusionError; + fn try_from(v: &[u8]) -> Result> { + let arr: [u8; 16384] = v.try_into().map_err(|_| { + DataFusionError::Internal( + "Impossibly got invalid binary array from states".into(), ) - } - }; - Ok(accumulator) - } - - fn name(&self) -> &str { - &self.name + })?; + Ok(HyperLogLog::::new_with_registers(arr)) } } -impl PartialEq for ApproxDistinct { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.input_data_type == x.input_data_type - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) +impl TryFrom<&ScalarValue> for HyperLogLog { + type Error = DataFusionError; + fn try_from(v: &ScalarValue) -> Result> { + if let ScalarValue::Binary(Some(slice)) = v { + slice.as_slice().try_into() + } else { + internal_err!( + "Impossibly got invalid scalar value while converting to HyperLogLog" + ) + } } } #[derive(Debug)] -struct BinaryHLLAccumulator +struct NumericHLLAccumulator where - T: OffsetSizeTrait, + T: ArrowPrimitiveType, + T::Native: Hash, { - hll: HyperLogLog>, - phantom_data: PhantomData, + hll: HyperLogLog, } -impl BinaryHLLAccumulator +impl NumericHLLAccumulator where - T: OffsetSizeTrait, + T: ArrowPrimitiveType, + T::Native: Hash, { /// new approx_distinct accumulator pub fn new() -> Self { Self { hll: HyperLogLog::new(), - phantom_data: PhantomData, } } } @@ -172,55 +123,23 @@ where } #[derive(Debug)] -struct NumericHLLAccumulator +struct BinaryHLLAccumulator where - T: ArrowPrimitiveType, - T::Native: Hash, + T: OffsetSizeTrait, { - hll: HyperLogLog, + hll: HyperLogLog>, + phantom_data: PhantomData, } -impl NumericHLLAccumulator +impl BinaryHLLAccumulator where - T: ArrowPrimitiveType, - T::Native: Hash, + T: OffsetSizeTrait, { /// new approx_distinct accumulator pub fn new() -> Self { Self { hll: HyperLogLog::new(), - } - } -} - -impl From<&HyperLogLog> for ScalarValue { - fn from(v: &HyperLogLog) -> ScalarValue { - let values = v.as_ref().to_vec(); - ScalarValue::Binary(Some(values)) - } -} - -impl TryFrom<&[u8]> for HyperLogLog { - type Error = DataFusionError; - fn try_from(v: &[u8]) -> Result> { - let arr: [u8; 16384] = v.try_into().map_err(|_| { - DataFusionError::Internal( - "Impossibly got invalid binary array from states".into(), - ) - })?; - Ok(HyperLogLog::::new_with_registers(arr)) - } -} - -impl TryFrom<&ScalarValue> for HyperLogLog { - type Error = DataFusionError; - fn try_from(v: &ScalarValue) -> Result> { - if let ScalarValue::Binary(Some(slice)) = v { - slice.as_slice().try_into() - } else { - internal_err!( - "Impossibly got invalid scalar value while converting to HyperLogLog" - ) + phantom_data: PhantomData, } } } @@ -304,3 +223,82 @@ where default_accumulator_impl!(); } + +impl Debug for ApproxDistinct { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ApproxDistinct") + .field("name", &self.name()) + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxDistinct { + fn default() -> Self { + Self::new() + } +} + +pub struct ApproxDistinct { + signature: Signature, +} + +impl ApproxDistinct { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl AggregateUDFImpl for ApproxDistinct { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "approx_distinct" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::UInt64) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + Ok(vec![Field::new( + format_state_name(args.name, "hll_registers"), + DataType::Binary, + false, + )]) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + let accumulator: Box = match acc_args.input_type { + // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL + // TODO support for boolean (trivial case) + // https://github.com/apache/datafusion/issues/1109 + DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), + DataType::Utf8 => Box::new(StringHLLAccumulator::::new()), + DataType::LargeUtf8 => Box::new(StringHLLAccumulator::::new()), + DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), + DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), + other => { + return not_impl_err!( + "Support for 'approx_distinct' for data type {other} is not implemented" + ) + } + }; + Ok(accumulator) + } +} diff --git a/datafusion/physical-expr/src/aggregate/hyperloglog.rs b/datafusion/functions-aggregate/src/hyperloglog.rs similarity index 99% rename from datafusion/physical-expr/src/aggregate/hyperloglog.rs rename to datafusion/functions-aggregate/src/hyperloglog.rs index 657a7b9f7f21..3074889eab23 100644 --- a/datafusion/physical-expr/src/aggregate/hyperloglog.rs +++ b/datafusion/functions-aggregate/src/hyperloglog.rs @@ -20,7 +20,7 @@ //! `hyperloglog` is a module that contains a modified version //! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c) //! with some modification based on strong assumption of usage -//! within datafusion, so that [`datafusion_expr::approx_distinct`] function can +//! within datafusion, so that function can //! be efficiently implemented. //! //! Specifically, like Redis's version, this HLL structure uses diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 274ab8302e2a..2d062cf2cb9b 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -55,8 +55,10 @@ #[macro_use] pub mod macros; +pub mod approx_distinct; pub mod covariance; pub mod first_last; +pub mod hyperloglog; pub mod median; pub mod stddev; pub mod sum; @@ -73,6 +75,7 @@ use std::sync::Arc; /// Fluent-style API for creating `Expr`s pub mod expr_fn { + pub use super::approx_distinct; pub use super::approx_median::approx_median; pub use super::covariance::covar_pop; pub use super::covariance::covar_samp; @@ -100,6 +103,7 @@ pub fn all_default_aggregate_functions() -> Vec> { stddev::stddev_udaf(), stddev::stddev_pop_udaf(), approx_median::approx_median_udaf(), + approx_distinct::approx_distinct_udaf(), ] } diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 89de6ad49c39..ac24dd2e7603 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -30,13 +30,12 @@ use std::sync::Arc; use arrow::datatypes::Schema; -use datafusion_common::{exec_err, not_impl_err, Result}; -use datafusion_expr::AggregateFunction; - +use crate::aggregate::average::Avg; use crate::aggregate::regr::RegrType; use crate::expressions::{self, Literal}; use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; - +use datafusion_common::{exec_err, not_impl_err, Result}; +use datafusion_expr::AggregateFunction; /// Create a physical aggregation expression. /// This function errors when `input_phy_exprs`' can't be coerced to a valid argument type of the aggregation function. pub fn create_aggregate_expr( @@ -104,9 +103,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::ApproxDistinct, _) => Arc::new( - expressions::ApproxDistinct::new(input_phy_exprs[0].clone(), name, data_type), - ), (AggregateFunction::ArrayAgg, false) => { let expr = input_phy_exprs[0].clone(); let nullable = expr.nullable(input_schema)?; @@ -149,11 +145,9 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::Avg, false) => Arc::new(expressions::Avg::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), + (AggregateFunction::Avg, false) => { + Arc::new(Avg::new(input_phy_exprs[0].clone(), name, data_type)) + } (AggregateFunction::Avg, true) => { return not_impl_err!("AVG(DISTINCT) aggregations are not available"); } @@ -325,8 +319,8 @@ mod tests { use super::*; use crate::expressions::{ - try_cast, ApproxDistinct, ApproxPercentileCont, ArrayAgg, Avg, BitAnd, BitOr, - BitXor, BoolAnd, BoolOr, Count, DistinctArrayAgg, DistinctCount, Max, Min, + try_cast, ApproxPercentileCont, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, + BoolOr, Count, DistinctArrayAgg, DistinctCount, Max, Min, }; use datafusion_common::{plan_err, DataFusionError, ScalarValue}; @@ -335,11 +329,7 @@ mod tests { #[test] fn test_count_arragg_approx_expr() -> Result<()> { - let funcs = vec![ - AggregateFunction::Count, - AggregateFunction::ArrayAgg, - AggregateFunction::ApproxDistinct, - ]; + let funcs = vec![AggregateFunction::Count, AggregateFunction::ArrayAgg]; let data_types = vec![ DataType::UInt32, DataType::Int32, @@ -371,14 +361,6 @@ mod tests { result_agg_phy_exprs.field().unwrap() ); } - AggregateFunction::ApproxDistinct => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", DataType::UInt64, false), - result_agg_phy_exprs.field().unwrap() - ); - } AggregateFunction::ArrayAgg => { assert!(result_agg_phy_exprs.as_any().is::()); assert_eq!("c1", result_agg_phy_exprs.name()); @@ -410,14 +392,6 @@ mod tests { result_distinct.field().unwrap() ); } - AggregateFunction::ApproxDistinct => { - assert!(result_distinct.as_any().is::()); - assert_eq!("c1", result_distinct.name()); - assert_eq!( - Field::new("c1", DataType::UInt64, false), - result_distinct.field().unwrap() - ); - } AggregateFunction::ArrayAgg => { assert!(result_distinct.as_any().is::()); assert_eq!("c1", result_distinct.name()); diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 9db80f155ab3..7a6c5f9d0e24 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -17,9 +17,6 @@ pub use datafusion_physical_expr_common::aggregate::AggregateExpr; -mod hyperloglog; - -pub(crate) mod approx_distinct; pub(crate) mod approx_percentile_cont; pub(crate) mod approx_percentile_cont_with_weight; pub(crate) mod array_agg; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 656cc570ca60..a96d02173018 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -36,14 +36,13 @@ mod try_cast; pub mod helpers { pub use crate::aggregate::min_max::{max, min}; } - -pub use crate::aggregate::approx_distinct::ApproxDistinct; pub use crate::aggregate::approx_percentile_cont::ApproxPercentileCont; pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileContWithWeight; pub use crate::aggregate::array_agg::ArrayAgg; pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; -pub use crate::aggregate::average::{Avg, AvgAccumulator}; +pub use crate::aggregate::average::Avg; +pub use crate::aggregate::average::AvgAccumulator; pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, DistinctBitXor}; pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr}; pub use crate::aggregate::build_in::create_aggregate_expr; @@ -67,6 +66,10 @@ pub use crate::PhysicalSortExpr; pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; pub use column::UnKnownColumn; +pub use datafusion_expr::utils::format_state_name; +pub use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; +pub use datafusion_physical_expr_common::expressions::column::{col, Column}; +pub use datafusion_physical_expr_common::expressions::{cast, CastExpr}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; @@ -77,11 +80,6 @@ pub use no_op::NoOp; pub use not::{not, NotExpr}; pub use try_cast::{try_cast, TryCastExpr}; -pub use datafusion_expr::utils::format_state_name; -pub use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; -pub use datafusion_physical_expr_common::expressions::column::{col, Column}; -pub use datafusion_physical_expr_common::expressions::{cast, CastExpr}; - #[cfg(test)] pub(crate) mod tests { use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 356eb7c86a69..79abbdb52ca2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1196,7 +1196,7 @@ mod tests { use datafusion_expr::expr::Sort; use datafusion_functions_aggregate::median::median_udaf; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Count, FirstValue, LastValue, OrderSensitiveArrayAgg, + lit, Count, FirstValue, LastValue, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::PhysicalSortExpr; @@ -1809,14 +1809,6 @@ mod tests { let aggregates_v0: Vec> = vec![test_median_agg_expr(&input_schema)?]; - // use slow-path in `hash.rs` - let aggregates_v1: Vec> = - vec![Arc::new(ApproxDistinct::new( - col("a", &input_schema)?, - "APPROX_DISTINCT(a)".to_string(), - DataType::UInt32, - ))]; - // use fast-path in `row_hash.rs`. let aggregates_v2: Vec> = vec![Arc::new(Avg::new( col("b", &input_schema)?, @@ -1826,7 +1818,6 @@ mod tests { for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), - (1, groups_some.clone(), aggregates_v1), (2, groups_some, aggregates_v2), ] { let n_aggr = aggregates.len(); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index df1859e25c96..b401ff8810db 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -477,7 +477,7 @@ enum AggregateFunction { // SUM = 2; AVG = 3; COUNT = 4; - APPROX_DISTINCT = 5; + // APPROX_DISTINCT = 5; ARRAY_AGG = 6; // VARIANCE = 7; // VARIANCE_POP = 8; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 4d0e2d93f9b7..d6632c77d8da 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -536,7 +536,6 @@ impl serde::Serialize for AggregateFunction { Self::Max => "MAX", Self::Avg => "AVG", Self::Count => "COUNT", - Self::ApproxDistinct => "APPROX_DISTINCT", Self::ArrayAgg => "ARRAY_AGG", Self::Correlation => "CORRELATION", Self::ApproxPercentileCont => "APPROX_PERCENTILE_CONT", @@ -573,7 +572,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "MAX", "AVG", "COUNT", - "APPROX_DISTINCT", "ARRAY_AGG", "CORRELATION", "APPROX_PERCENTILE_CONT", @@ -639,7 +637,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "MAX" => Ok(AggregateFunction::Max), "AVG" => Ok(AggregateFunction::Avg), "COUNT" => Ok(AggregateFunction::Count), - "APPROX_DISTINCT" => Ok(AggregateFunction::ApproxDistinct), "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg), "CORRELATION" => Ok(AggregateFunction::Correlation), "APPROX_PERCENTILE_CONT" => Ok(AggregateFunction::ApproxPercentileCont), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ca871a4a9dde..0aca5ef1ffb8 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1929,7 +1929,7 @@ pub enum AggregateFunction { /// SUM = 2; Avg = 3, Count = 4, - ApproxDistinct = 5, + /// APPROX_DISTINCT = 5; ArrayAgg = 6, /// VARIANCE = 7; /// VARIANCE_POP = 8; @@ -1971,7 +1971,6 @@ impl AggregateFunction { AggregateFunction::Max => "MAX", AggregateFunction::Avg => "AVG", AggregateFunction::Count => "COUNT", - AggregateFunction::ApproxDistinct => "APPROX_DISTINCT", AggregateFunction::ArrayAgg => "ARRAY_AGG", AggregateFunction::Correlation => "CORRELATION", AggregateFunction::ApproxPercentileCont => "APPROX_PERCENTILE_CONT", @@ -2004,7 +2003,6 @@ impl AggregateFunction { "MAX" => Some(Self::Max), "AVG" => Some(Self::Avg), "COUNT" => Some(Self::Count), - "APPROX_DISTINCT" => Some(Self::ApproxDistinct), "ARRAY_AGG" => Some(Self::ArrayAgg), "CORRELATION" => Some(Self::Correlation), "APPROX_PERCENTILE_CONT" => Some(Self::ApproxPercentileCont), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 5c083fa27a9b..3ad5973380ed 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -146,7 +146,6 @@ impl From for AggregateFunction { protobuf::AggregateFunction::BoolAnd => Self::BoolAnd, protobuf::AggregateFunction::BoolOr => Self::BoolOr, protobuf::AggregateFunction::Count => Self::Count, - protobuf::AggregateFunction::ApproxDistinct => Self::ApproxDistinct, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, protobuf::AggregateFunction::Correlation => Self::Correlation, protobuf::AggregateFunction::RegrSlope => Self::RegrSlope, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index e2259896b26e..d42470f198e3 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -117,7 +117,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::BoolAnd => Self::BoolAnd, AggregateFunction::BoolOr => Self::BoolOr, AggregateFunction::Count => Self::Count, - AggregateFunction::ApproxDistinct => Self::ApproxDistinct, AggregateFunction::ArrayAgg => Self::ArrayAgg, AggregateFunction::Correlation => Self::Correlation, AggregateFunction::RegrSlope => Self::RegrSlope, @@ -392,9 +391,6 @@ pub fn serialize_expr( }) => match func_def { AggregateFunctionDefinition::BuiltIn(fun) => { let aggr_function = match fun { - AggregateFunction::ApproxDistinct => { - protobuf::AggregateFunction::ApproxDistinct - } AggregateFunction::ApproxPercentileCont => { protobuf::AggregateFunction::ApproxPercentileCont } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 19ba4a40d52b..5258bdd11d86 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -23,12 +23,12 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - ApproxDistinct, ApproxPercentileCont, ApproxPercentileContWithWeight, ArrayAgg, Avg, - BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, - Correlation, Count, CumeDist, DistinctArrayAgg, DistinctBitXor, DistinctCount, - Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, - NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, Regr, - RegrType, RowNumber, StringAgg, TryCastExpr, WindowShift, + ApproxPercentileCont, ApproxPercentileContWithWeight, ArrayAgg, Avg, BinaryExpr, + BitAnd, BitOr, BitXor, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, + Count, CumeDist, DistinctArrayAgg, DistinctBitXor, DistinctCount, Grouping, + InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, + NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, Regr, RegrType, + RowNumber, StringAgg, TryCastExpr, WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -260,8 +260,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { protobuf::AggregateFunction::BoolAnd } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::BoolOr - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::ApproxDistinct } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::ArrayAgg } else if aggr_expr.downcast_ref::().is_some() { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a245793ebd09..7ba1893bb11a 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -72,7 +72,7 @@ CREATE TABLE test (c1 BIGINT,c2 BIGINT) as values ####### # https://github.com/apache/datafusion/issues/3353 -statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name "APPROX_DISTINCT\(aggregate_test_100\.c9\)" +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name "approx_distinct\(aggregate_test_100\.c9\)" SELECT approx_distinct(c9) count_c9, approx_distinct(cast(c9 as varchar)) count_c9_str FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_weight