Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,37 +582,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// All of the aggregate expressions (deduplicated).
let aggr_exprs = find_aggregate_exprs(&aggr_expr_haystack);

let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) =
if !select.group_by.is_empty() || !aggr_exprs.is_empty() {
self.aggregate(
&plan,
&select_exprs,
&having_expr_opt,
&select.group_by,
aggr_exprs,
)?
} else {
if let Some(having_expr) = &having_expr_opt {
let available_columns = select_exprs
.iter()
.map(|expr| expr_as_column_expr(expr, &plan))
.collect::<Result<Vec<Expr>>>()?;

// Ensure the HAVING expression is using only columns
// provided by the SELECT.
if !can_columns_satisfy_exprs(
&available_columns,
&[having_expr.clone()],
)? {
return Err(DataFusionError::Plan(
"Having references column(s) not provided by the select"
.to_owned(),
));
}
let group_by_exprs = select
.group_by
.iter()
.map(|e| {
let group_by_expr = self.sql_expr_to_logical_expr(e)?;
let group_by_expr = resolve_aliases_to_exprs(
&group_by_expr,
&extract_aliases(&select_exprs),
)?;
self.validate_schema_satisfies_exprs(
plan.schema(),
&[group_by_expr.clone()],
)?;
Ok(group_by_expr)
})
.collect::<Result<Vec<Expr>>>()?;

let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) = if !group_by_exprs
.is_empty()
|| !aggr_exprs.is_empty()
{
self.aggregate(
&plan,
&select_exprs,
&having_expr_opt,
group_by_exprs,
aggr_exprs,
)?
} else {
if let Some(having_expr) = &having_expr_opt {
let available_columns = select_exprs
.iter()
.map(|expr| expr_as_column_expr(expr, &plan))
.collect::<Result<Vec<Expr>>>()?;

// Ensure the HAVING expression is using only columns
// provided by the SELECT.
if !can_columns_satisfy_exprs(&available_columns, &[having_expr.clone()])?
{
return Err(DataFusionError::Plan(
"Having references column(s) not provided by the select"
.to_owned(),
));
}
}

(plan, select_exprs, having_expr_opt)
};
(plan, select_exprs, having_expr_opt)
};

let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt {
LogicalPlanBuilder::from(&plan)
Expand Down Expand Up @@ -691,14 +708,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: &LogicalPlan,
select_exprs: &[Expr],
having_expr_opt: &Option<Expr>,
group_by: &[SQLExpr],
group_by_exprs: Vec<Expr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

aggr_exprs: Vec<Expr>,
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
let group_by_exprs = group_by
.iter()
.map(|e| self.sql_to_rex(e, &input.schema()))
.collect::<Result<Vec<Expr>>>()?;

let aggr_projection_exprs = group_by_exprs
.iter()
.chain(aggr_exprs.iter())
Expand Down Expand Up @@ -2285,15 +2297,12 @@ mod tests {
}

#[test]
fn select_simple_aggregate_with_groupby_cannot_use_alias() {
let sql = "SELECT state AS x, MAX(age) FROM person GROUP BY x";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
format!(
"Plan(\"Invalid identifier \\\'x\\\' for schema {}\")",
PERSON_COLUMN_NAMES
),
format!("{:?}", err)
fn select_simple_aggregate_with_groupby_can_use_alias() {
quick_test(
"SELECT state AS a, MIN(age) AS b FROM person GROUP BY a",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"Projection: #state AS a, #MIN(age) AS b\
\n Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
\n TableScan: person projection=None",
);
}

Expand Down