Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,16 +838,38 @@ pub fn exprlist_len(
qualifier: Some(qualifier),
options,
} => {
let related_wildcard_schema = wildcard_schema.as_ref().map_or_else(
|| Ok(Arc::clone(schema)),
|schema| {
// Eliminate the fields coming from other tables.
let qualified_fields = schema
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
let (maybe_table_ref, _) = schema.qualified_field(idx);
if maybe_table_ref.map_or(true, |q| q == qualifier) {
Some((maybe_table_ref.cloned(), Arc::clone(field)))
} else {
None
}
})
.collect::<Vec<_>>();
let metadata = schema.metadata().clone();
DFSchema::new_with_metadata(qualified_fields, metadata)
.map(Arc::new)
},
)?;
let excluded = get_excluded_columns(
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema.unwrap_or(schema),
related_wildcard_schema.as_ref(),
Some(qualifier),
)?
.into_iter()
.collect::<HashSet<Column>>();
Ok(
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
get_exprs_except_skipped(related_wildcard_schema.as_ref(), excluded)
.len(),
)
}
Expand Down
61 changes: 61 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4040,3 +4040,64 @@ physical_plan
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
05)------MemoryExec: partitions=1, partition_sizes=[1]


# Functional dependencies across a join
statement ok
CREATE TABLE sales_global (
ts TIMESTAMP,
sn INTEGER,
amount INTEGER,
currency VARCHAR NOT NULL,
primary key(sn)
);

statement ok
CREATE TABLE exchange_rates (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about dorp sales_global and exchange_rates after the test?

ts TIMESTAMP,
sn INTEGER,
currency_from VARCHAR NOT NULL,
currency_to VARCHAR NOT NULL,
rate FLOAT,
primary key(sn)
);

query TT
EXPLAIN SELECT s.*, s.amount * LAST_VALUE(e.rate) AS amount_usd
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
e.currency_to = 'USD' AND
s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn
----
logical_plan
01)Sort: s.sn ASC NULLS LAST
02)--Projection: s.ts, s.sn, s.amount, s.currency, CAST(s.amount AS Float32) * last_value(e.rate) AS amount_usd
03)----Aggregate: groupBy=[[s.sn, s.ts, s.amount, s.currency]], aggr=[[last_value(e.rate)]]
04)------Projection: s.ts, s.sn, s.amount, s.currency, e.rate
05)--------Inner Join: s.currency = e.currency_from Filter: s.ts >= e.ts
06)----------SubqueryAlias: s
07)------------TableScan: sales_global projection=[ts, sn, amount, currency]
08)----------SubqueryAlias: e
09)------------Projection: exchange_rates.ts, exchange_rates.currency_from, exchange_rates.rate
10)--------------Filter: exchange_rates.currency_to = Utf8("USD")
11)----------------TableScan: exchange_rates projection=[ts, currency_from, currency_to, rate]
physical_plan
01)SortExec: expr=[sn@1 ASC NULLS LAST], preserve_partitioning=[false]
02)--ProjectionExec: expr=[ts@1 as ts, sn@0 as sn, amount@2 as amount, currency@3 as currency, CAST(amount@2 AS Float32) * last_value(e.rate)@4 as amount_usd]
03)----AggregateExec: mode=Single, gby=[sn@1 as sn, ts@0 as ts, amount@2 as amount, currency@3 as currency], aggr=[last_value(e.rate)]
04)------CoalesceBatchesExec: target_batch_size=3
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@3, currency_from@1)], filter=ts@0 >= ts@1, projection=[ts@0, sn@1, amount@2, currency@3, rate@6]
06)----------MemoryExec: partitions=1, partition_sizes=[0]
07)----------ProjectionExec: expr=[ts@0 as ts, currency_from@1 as currency_from, rate@3 as rate]
08)------------CoalesceBatchesExec: target_batch_size=3
09)--------------FilterExec: currency_to@2 = USD
10)----------------MemoryExec: partitions=1, partition_sizes=[0]

statement ok
DROP TABLE sales_global;

statement ok
DROP TABLE exchange_rates;