Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ impl DFSchema {

for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
qualified_names.insert((qualifier, field.name()));
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
Expand Down Expand Up @@ -1165,7 +1170,10 @@ mod tests {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
assert!(join.err().is_none());
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
);
Ok(())
}

Expand Down
46 changes: 0 additions & 46 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3337,52 +3337,6 @@ mod tests {
Ok(())
}

// Table 't1' self join
// Supplementary test of issue: https://github.com/apache/datafusion/issues/7790
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this test removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have explained that more in the description. Before it tests that calling with_column after a self join without alias will fail in a certain way. After this change it will fail already at the self join. So there is no way to make it test what it testsed before and I belive the self join already has test coverage elsewhere.

#[tokio::test]
async fn with_column_self_join() -> Result<()> {
let df = test_table().await?.select_columns(&["c1"])?;
let ctx = SessionContext::new();

ctx.register_table("t1", df.into_view())?;

let df = ctx
.table("t1")
.await?
.join(
ctx.table("t1").await?,
JoinType::Inner,
&["c1"],
&["c1"],
None,
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+",
"| c1 | c1 |",
"+----+----+",
"| a | a |",
"+----+----+",
],
&df_results
);

let actual_err = df.clone().with_column("new_column", lit(true)).unwrap_err();
let expected_err = "Error during planning: Projections require unique expression names \
but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. \
Consider aliasing (\"AS\") one of them.";
assert_eq!(actual_err.strip_backtrace(), expected_err);

Ok(())
}

#[tokio::test]
async fn with_column_renamed() -> Result<()> {
let df = test_table()
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
serde_json = { workspace = true }
sqlparser = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions,
};
use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
Expand Down Expand Up @@ -3069,6 +3070,8 @@ fn calc_func_dependencies_for_aggregate(
let group_by_expr_names = group_expr
.iter()
.map(|item| item.schema_name().to_string())
.collect::<IndexSet<_>>()
.into_iter()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we directly remove duplicates from group_expr so that we don't need to perform duplicate removal here again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with trying to make this work and I found it was not easy -- perhaps we can do it as a follow on PR?

.collect::<Vec<_>>();
let aggregate_func_dependencies = aggregate_functional_dependencies(
input.schema(),
Expand Down
18 changes: 7 additions & 11 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_common::{
DataFusionError, Result, TableReference,
};

use indexmap::IndexSet;
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};

pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
Expand All @@ -59,16 +60,7 @@ pub fn exprlist_to_columns(expr: &[Expr], accum: &mut HashSet<Column>) -> Result
/// Count the number of distinct exprs in a list of group by expressions. If the
/// first element is a `GroupingSet` expression then it must be the only expr.
pub fn grouping_set_expr_count(group_expr: &[Expr]) -> Result<usize> {
if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
if group_expr.len() > 1 {
return plan_err!(
"Invalid group by expressions, GroupingSet must be the only expression"
);
}
Ok(grouping_set.distinct_expr().len())
} else {
Ok(group_expr.len())
}
grouping_set_to_exprlist(group_expr).map(|exprs| exprs.len())
}

/// The [power set] (or powerset) of a set S is the set of all subsets of S, \
Expand Down Expand Up @@ -260,7 +252,11 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<&Expr>> {
}
Ok(grouping_set.distinct_expr())
} else {
Ok(group_expr.iter().collect())
Ok(group_expr
.iter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if can get rid of double collection iteration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The double iteration is used to deduplicate the values

.collect::<IndexSet<_>>()
.into_iter()
.collect())
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ fn select_wildcard_with_repeated_column() {
let sql = "SELECT *, col_int32 FROM test";
let err = test_sql(sql).expect_err("query should have failed");
assert_eq!(
"expand_wildcard_rule\ncaused by\nError during planning: Projections require unique expression names but the expression \"test.col_int32\" at position 0 and \"test.col_int32\" at position 7 have the same name. Consider aliasing (\"AS\") one of them.",
"Schema error: Schema contains duplicate qualified field name test.col_int32",
err.strip_backtrace()
);
}
Expand Down Expand Up @@ -396,7 +396,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
.with_udaf(count_udaf())
.with_udaf(avg_udaf());
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();
let plan = sql_to_rel.sql_statement_to_plan(statement.clone())?;

let config = OptimizerContext::new().with_skip_failing_rules(false);
let analyzer = Analyzer::new();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ impl PlannerContext {

/// extends the FROM schema, returning the existing one, if any
pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
self.outer_from_schema = match self.outer_from_schema.as_ref() {
Some(from_schema) => Some(Arc::new(from_schema.join(schema)?)),
None => Some(Arc::clone(schema)),
match self.outer_from_schema.as_mut() {
Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL Arc::make_mut 📓

None => self.outer_from_schema = Some(Arc::clone(schema)),
Comment on lines -200 to +202
Copy link
Contributor Author

@eejbyfeldt eejbyfeldt Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aalexandrov Can you help verify that this is correct?

I am not fully following your comment here: #11456 (comment) At least in the current code merge will still have both values for j1_id since they will have different qualifiers.

};
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1215,14 +1215,14 @@ statement ok
create table t1(v1 int) as values(100);

## Query with Ambiguous column reference
query error DataFusion error: Schema error: Ambiguous reference to unqualified field v1
query error DataFusion error: Schema error: Schema contains duplicate qualified field name t1\.v1
select count(*)
from t1
right outer join t1
on t1.v1 > 0;

query error DataFusion error: Schema error: Ambiguous reference to unqualified field v1
query error DataFusion error: Schema error: Schema contains duplicate qualified field name t1\.v1
select t1.v1 from t1 join t1 using(v1) cross join (select struct('foo' as v1) as t1);

statement ok
drop table t1;
drop table t1;