Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {

Ok(Expr::AggregateFunction {
fun,
args: vec![parse_required_expr(&expr.expr)?],
arg: Box::new(parse_required_expr(&expr.expr)?),
distinct: false, //TODO
})
}
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
})
}
Expr::AggregateFunction {
ref fun, ref args, ..
ref fun, ref arg, ..
} => {
let aggr_function = match fun {
AggregateFunction::Min => protobuf::AggregateFunction::Min,
Expand All @@ -1072,7 +1072,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::Count => protobuf::AggregateFunction::Count,
};

let arg = &args[0];
let arg = &**arg;
let aggregate_expr = Box::new(protobuf::AggregateExprNode {
aggr_function: aggr_function.into(),
expr: Some(Box::new(arg.try_into()?)),
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let df_planner = DefaultPhysicalPlanner::default();
for (expr, name) in &logical_agg_expr {
match expr {
Expr::AggregateFunction { fun, args, .. } => {
Expr::AggregateFunction { fun, arg, .. } => {
let arg = df_planner
.create_physical_expr(
&args[0],
&**arg,
&physical_schema,
&ctx_state,
)
Expand Down
39 changes: 17 additions & 22 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub enum Expr {
/// Name of the function
fun: aggregates::AggregateFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
arg: Box<Expr>,
/// Whether this is a DISTINCT aggregation or not
distinct: bool,
},
Expand Down Expand Up @@ -259,12 +259,9 @@ impl Expr {
.collect::<Result<Vec<_>>>()?;
window_functions::return_type(fun, &data_types)
}
Expr::AggregateFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
aggregates::return_type(fun, &data_types)
Expr::AggregateFunction { fun, arg, .. } => {
let data_type = arg.get_type(schema)?;
aggregates::return_type(fun, &[data_type])
}
Expr::AggregateUDF { fun, args, .. } => {
let data_types = args
Expand Down Expand Up @@ -590,9 +587,7 @@ impl Expr {
Expr::WindowFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::AggregateFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::AggregateFunction { arg, .. } => arg.accept(visitor),
Expr::AggregateUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expand Down Expand Up @@ -728,11 +723,11 @@ impl Expr {
fun,
},
Expr::AggregateFunction {
args,
arg,
fun,
distinct,
} => Expr::AggregateFunction {
args: rewrite_vec(args, rewriter)?,
arg: rewrite_boxed(arg, rewriter)?,
fun,
distinct,
},
Expand Down Expand Up @@ -969,7 +964,7 @@ pub fn min(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Min,
distinct: false,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand All @@ -978,7 +973,7 @@ pub fn max(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Max,
distinct: false,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand All @@ -987,7 +982,7 @@ pub fn sum(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Sum,
distinct: false,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand All @@ -996,7 +991,7 @@ pub fn avg(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Avg,
distinct: false,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand All @@ -1005,7 +1000,7 @@ pub fn count(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Count,
distinct: false,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand All @@ -1014,7 +1009,7 @@ pub fn count_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Count,
distinct: true,
args: vec![expr],
arg: Box::new(expr),
}
}

Expand Down Expand Up @@ -1276,9 +1271,9 @@ impl fmt::Debug for Expr {
Expr::AggregateFunction {
fun,
distinct,
ref args,
ref arg,
..
} => fmt_function(f, &fun.to_string(), *distinct, args),
} => fmt_function(f, &fun.to_string(), *distinct, &[*arg.clone()]),
Expr::AggregateUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args)
}
Expand Down Expand Up @@ -1394,9 +1389,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::AggregateFunction {
fun,
distinct,
args,
arg,
..
} => create_function_name(&fun.to_string(), *distinct, args, input_schema),
} => create_function_name(&fun.to_string(), *distinct, &[*arg.clone()], input_schema),
Expr::AggregateUDF { fun, args } => {
let mut names = Vec::with_capacity(args.len());
for e in args {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::ScalarFunction { args, .. } => Ok(args.clone()),
Expr::ScalarUDF { args, .. } => Ok(args.clone()),
Expr::WindowFunction { args, .. } => Ok(args.clone()),
Expr::AggregateFunction { args, .. } => Ok(args.clone()),
Expr::AggregateFunction { arg, .. } => Ok(vec![arg.as_ref().to_owned()]),
Expr::AggregateUDF { args, .. } => Ok(args.clone()),
Expr::Case {
expr,
Expand Down Expand Up @@ -344,7 +344,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}),
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
fun: fun.clone(),
args: expressions.to_vec(),
arg: Box::new(expressions[0].clone()),
distinct: *distinct,
}),
Expr::AggregateUDF { fun, .. } => Ok(Expr::AggregateUDF {
Expand Down
11 changes: 3 additions & 8 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,19 +779,14 @@ impl DefaultPhysicalPlanner {
Expr::AggregateFunction {
fun,
distinct,
args,
arg,
..
} => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let args = self.create_physical_expr(arg, physical_input_schema, ctx_state)?;
aggregates::create_aggregate_expr(
fun,
*distinct,
&args,
&[args],
physical_input_schema,
name,
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return Ok(Expr::AggregateFunction {
fun,
distinct: function.distinct,
args,
arg: Box::new(args[0].clone()),
});
};

Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,11 @@ where
None => match expr {
Expr::AggregateFunction {
fun,
args,
arg,
distinct,
} => Ok(Expr::AggregateFunction {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
arg: Box::new(clone_with_replacement(arg, replacement_fn)?),
distinct: *distinct,
}),
Expr::WindowFunction { fun, args } => Ok(Expr::WindowFunction {
Expand Down