Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
c7bae26
Aggregate rewrite for dataframe API.
mustafasrepo Oct 9, 2023
66704c0
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Oct 9, 2023
c7374d1
Simplifications
mustafasrepo Oct 9, 2023
f669ef0
Minor changes
mustafasrepo Oct 9, 2023
5d9dffb
Minor changes
mustafasrepo Oct 9, 2023
b353c7d
Add new test
mustafasrepo Oct 10, 2023
d2a902e
Add new tests
mustafasrepo Oct 10, 2023
2c4fcd9
Minor changes
mustafasrepo Oct 10, 2023
bfb8a78
Add rule, for aggregate simplification
mustafasrepo Oct 10, 2023
16b55b3
Simplifications
mustafasrepo Oct 10, 2023
209a262
Simplifications
mustafasrepo Oct 10, 2023
9f8e5ab
Simplifications
mustafasrepo Oct 10, 2023
aca4d9d
Minor changes
mustafasrepo Oct 11, 2023
1c3d8a0
Simplifications
mustafasrepo Oct 11, 2023
d57be29
Add new test condition
mustafasrepo Oct 11, 2023
3221c77
Tmp
mustafasrepo Oct 13, 2023
1e45b13
Push requirement below aggregate
mustafasrepo Oct 13, 2023
95476f2
Add join and subqeury alias
mustafasrepo Oct 13, 2023
baa07b2
Add cross join support
mustafasrepo Oct 16, 2023
9dc7cfc
Minor changes
mustafasrepo Oct 16, 2023
4be0b04
Add logical plan repartition support
mustafasrepo Oct 17, 2023
273890f
Add union support
mustafasrepo Oct 17, 2023
67338d2
Add table scan
mustafasrepo Oct 17, 2023
58aa3bc
Add limit
mustafasrepo Oct 17, 2023
d887f27
Minor changes, buggy
mustafasrepo Oct 17, 2023
4ec506b
Add new tests, fix existing bugs
mustafasrepo Oct 18, 2023
0270546
change concat type array_concat
mustafasrepo Oct 18, 2023
9e69390
Resolve some of the bugs
mustafasrepo Oct 18, 2023
c6f2fe4
Comment out a rule
mustafasrepo Oct 18, 2023
547d13f
All tests pass, when single distinct is closed
mustafasrepo Oct 18, 2023
3bc9080
Fix aggregate bug
mustafasrepo Oct 19, 2023
4faa67a
Change analyze and explain implementations
mustafasrepo Oct 19, 2023
3dd3c4b
All tests pass
mustafasrepo Oct 19, 2023
fc1d476
Resolve linter errors
mustafasrepo Oct 19, 2023
0d83e00
Simplifications, remove unnecessary codes
mustafasrepo Oct 19, 2023
ec7514c
Comment out tests
mustafasrepo Oct 20, 2023
ef0c792
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Oct 20, 2023
3a9070f
Remove pushdown projection
mustafasrepo Oct 23, 2023
12a049f
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Oct 26, 2023
7e85fbd
Pushdown empty projections
mustafasrepo Oct 26, 2023
0f65246
Fix failing tests
mustafasrepo Oct 26, 2023
8bbc8c0
Simplifications
mustafasrepo Oct 26, 2023
0accfb9
Update comments, simplifications
mustafasrepo Oct 27, 2023
408f7c3
Remove eliminate projection rule, Add method for group expr len aggre…
mustafasrepo Oct 30, 2023
f5eeb1d
Simplifications, subquery support
mustafasrepo Oct 30, 2023
f6dc90b
Update comments, add unnest support, simplifications
mustafasrepo Nov 1, 2023
2ec2bba
Remove eliminate projection pass
mustafasrepo Nov 1, 2023
cf4f73c
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Nov 9, 2023
5aeae14
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Nov 17, 2023
1f3ec6e
Change name
mustafasrepo Nov 17, 2023
5ffe1dd
Minor changes
mustafasrepo Nov 17, 2023
bb5a848
Minor changes
mustafasrepo Nov 17, 2023
9283bc5
Add comments
mustafasrepo Nov 17, 2023
2adb9cd
Fix failing test
mustafasrepo Nov 17, 2023
c397a55
Minor simplifications
berkaysynnada Nov 17, 2023
8d83371
update
berkaysynnada Nov 20, 2023
7399e07
Minor
berkaysynnada Nov 20, 2023
0073880
Remove ordering
mustafasrepo Nov 20, 2023
ae7dc0f
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Nov 20, 2023
1dd272c
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Nov 22, 2023
64007a9
Minor changes
mustafasrepo Nov 22, 2023
81b55af
add merge projections
mustafasrepo Nov 23, 2023
1653906
Add comments, resolve linter errors
mustafasrepo Nov 24, 2023
3bcadd1
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Nov 29, 2023
85b4b38
Minor changes
mustafasrepo Nov 29, 2023
361f774
Minor changes
mustafasrepo Nov 29, 2023
cc4be21
Minor changes
mustafasrepo Nov 29, 2023
146391c
Minor changes
mustafasrepo Nov 29, 2023
6ee763a
Minor changes
mustafasrepo Nov 29, 2023
bc1fbdf
Minor changes
mustafasrepo Nov 29, 2023
3878313
Minor changes
mustafasrepo Nov 29, 2023
2289ed0
Minor changes
mustafasrepo Nov 29, 2023
765e500
Merge branch 'enhance/aggregate_pk' of https://github.com/synnada-ai/…
mustafasrepo Nov 29, 2023
4346ac7
Review Part 1
ozankabak Dec 4, 2023
ff9447a
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Dec 4, 2023
7fc6626
Review Part 2
ozankabak Dec 4, 2023
05fe152
Fix quadratic search, Change trim_expr impl
mustafasrepo Dec 5, 2023
c2ef739
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Dec 5, 2023
77035b5
Review Part 3
ozankabak Dec 5, 2023
a8c314b
Address reviews
mustafasrepo Dec 5, 2023
af172f8
Minor changes
mustafasrepo Dec 6, 2023
d4bf02a
Review Part 4
ozankabak Dec 6, 2023
7502651
Add case expr support
mustafasrepo Dec 6, 2023
ec39bab
Review Part 5
ozankabak Dec 7, 2023
e6f9333
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Dec 7, 2023
0170942
Review Part 6
ozankabak Dec 8, 2023
38a3df6
Merge branch 'apache_main' into enhance/aggregate_pk
mustafasrepo Dec 8, 2023
2b9de85
Finishing touch: Improve comments
ozankabak Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ impl DFSchema {
pub fn with_functional_dependencies(
mut self,
functional_dependencies: FunctionalDependencies,
) -> Self {
self.functional_dependencies = functional_dependencies;
self
) -> Result<Self> {
if functional_dependencies.is_valid(self.fields.len()) {
self.functional_dependencies = functional_dependencies;
Ok(self)
} else {
_plan_err!(
"Invalid functional dependency: {:?}",
functional_dependencies
)
}
}

/// Create a new schema that contains the fields from this schema followed by the fields
Expand Down
117 changes: 103 additions & 14 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
Expand Down Expand Up @@ -271,6 +272,29 @@ impl FunctionalDependencies {
self.deps.extend(other.deps);
}

/// Sanity checks if functional dependencies are valid. For example, if
/// there are 10 fields, we cannot receive any index further than 9.
pub fn is_valid(&self, n_field: usize) -> bool {
self.deps.iter().all(
|FunctionalDependence {
source_indices,
target_indices,
..
}| {
source_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
&& target_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
},
)
}

/// Adds the `offset` value to `source_indices` and `target_indices` for
/// each functional dependency.
pub fn add_offset(&mut self, offset: usize) {
Expand Down Expand Up @@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies(
} in &func_dependencies.deps
{
// Keep source indices in a `HashSet` to prevent duplicate entries:
let mut new_source_indices = HashSet::new();
let mut new_source_indices = vec![];
let mut new_source_field_names = vec![];
let source_field_names = source_indices
.iter()
.map(|&idx| aggr_input_fields[idx].qualified_name())
.collect::<Vec<_>>();

for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
// When one of the input determinant expressions matches with
// the GROUP BY expression, add the index of the GROUP BY
// expression as a new determinant key:
if source_field_names.contains(group_by_expr_name) {
new_source_indices.insert(idx);
new_source_indices.push(idx);
new_source_field_names.push(group_by_expr_name.clone());
}
}
let existing_target_indices =
get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
let new_target_indices = get_target_functional_dependencies(
aggr_input_schema,
&new_source_field_names,
);
let mode = if existing_target_indices == new_target_indices
&& new_target_indices.is_some()
{
// If dependency covers all GROUP BY expressions, mode will be `Single`:
Dependency::Single
} else {
// Otherwise, existing mode is preserved:
*mode
};
// All of the composite indices occur in the GROUP BY expression:
if new_source_indices.len() == source_indices.len() {
aggregate_func_dependencies.push(
FunctionalDependence::new(
new_source_indices.into_iter().collect(),
new_source_indices,
target_indices.clone(),
*nullable,
)
// input uniqueness stays the same when GROUP BY matches with input functional dependence determinants
.with_mode(*mode),
.with_mode(mode),
);
}
}

// If we have a single GROUP BY key, we can guarantee uniqueness after
// aggregation:
if group_by_expr_names.len() == 1 {
// If `source_indices` contain 0, delete this functional dependency
// as it will be added anyway with mode `Dependency::Single`:
if let Some(idx) = aggregate_func_dependencies
.iter()
.position(|item| item.source_indices.contains(&0))
{
// Delete the functional dependency that contains zeroth idx:
aggregate_func_dependencies.remove(idx);
}
aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0));
// Add a new functional dependency associated with the whole table:
aggregate_func_dependencies.push(
// Use nullable property of the group by expression
Expand Down Expand Up @@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies(
combined_target_indices.extend(target_indices.iter());
}
}
(!combined_target_indices.is_empty())
.then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
(!combined_target_indices.is_empty()).then_some({
let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
result.sort();
result
})
}

/// Returns indices for the minimal subset of GROUP BY expressions that are
/// functionally equivalent to the original set of GROUP BY expressions.
pub fn get_required_group_by_exprs_indices(
schema: &DFSchema,
group_by_expr_names: &[String],
) -> Option<Vec<usize>> {
let dependencies = schema.functional_dependencies();
let field_names = schema
.fields()
.iter()
.map(|item| item.qualified_name())
.collect::<Vec<_>>();
let mut groupby_expr_indices = group_by_expr_names
.iter()
.map(|group_by_expr_name| {
field_names
.iter()
.position(|field_name| field_name == group_by_expr_name)
})
.collect::<Option<Vec<_>>>()?;

groupby_expr_indices.sort();
for FunctionalDependence {
source_indices,
target_indices,
..
} in &dependencies.deps
{
if source_indices
.iter()
.all(|source_idx| groupby_expr_indices.contains(source_idx))
{
// If all source indices are among GROUP BY expression indices, we
// can remove target indices from GROUP BY expression indices and
// use source indices instead.
groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
groupby_expr_indices =
merge_and_order_indices(groupby_expr_indices, source_indices);
}
}
groupby_expr_indices
.iter()
.map(|idx| {
group_by_expr_names
.iter()
.position(|name| &field_names[*idx] == name)
})
.collect()
}

/// Updates entries inside the `entries` vector with their corresponding
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ pub use file_options::file_type::{
};
pub use file_options::FileTypeWriterOptions;
pub use functional_dependencies::{
aggregate_functional_dependencies, get_target_functional_dependencies, Constraint,
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
get_target_functional_dependencies, Constraint, Constraints, Dependency,
FunctionalDependence, FunctionalDependencies,
};
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
Expand Down
Loading