Skip to content

Commit 8f9d6e3

Browse files
mustafasrepoberkaysynnadaozankabak
authored
Add PRIMARY KEY Aggregate support to dataframe API (#8356)
* Aggregate rewrite for dataframe API. * Simplifications * Minor changes * Minor changes * Add new test * Add new tests * Minor changes * Add rule, for aggregate simplification * Simplifications * Simplifications * Simplifications * Minor changes * Simplifications * Add new test condition * Tmp * Push requirement below aggregate * Add join and subqeury alias * Add cross join support * Minor changes * Add logical plan repartition support * Add union support * Add table scan * Add limit * Minor changes, buggy * Add new tests, fix existing bugs * change concat type array_concat * Resolve some of the bugs * Comment out a rule * All tests pass, when single distinct is closed * Fix aggregate bug * Change analyze and explain implementations * All tests pass * Resolve linter errors * Simplifications, remove unnecessary codes * Comment out tests * Remove pushdown projection * Pushdown empty projections * Fix failing tests * Simplifications * Update comments, simplifications * Remove eliminate projection rule, Add method for group expr len aggregate * Simplifications, subquery support * Update comments, add unnest support, simplifications * Remove eliminate projection pass * Change name * Minor changes * Minor changes * Add comments * Fix failing test * Minor simplifications * update * Minor * Remove ordering * Minor changes * add merge projections * Add comments, resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Minor changes * Minor changes * Minor changes * Minor changes * Review Part 1 * Review Part 2 * Fix quadratic search, Change trim_expr impl * Review Part 3 * Address reviews * Minor changes * Review Part 4 * Add case expr support * Review Part 5 * Review Part 6 * Finishing touch: Improve comments --------- Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent d43a70d commit 8f9d6e3

File tree

14 files changed

+1349
-611
lines changed

14 files changed

+1349
-611
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,16 @@ impl DFSchema {
199199
pub fn with_functional_dependencies(
200200
mut self,
201201
functional_dependencies: FunctionalDependencies,
202-
) -> Self {
203-
self.functional_dependencies = functional_dependencies;
204-
self
202+
) -> Result<Self> {
203+
if functional_dependencies.is_valid(self.fields.len()) {
204+
self.functional_dependencies = functional_dependencies;
205+
Ok(self)
206+
} else {
207+
_plan_err!(
208+
"Invalid functional dependency: {:?}",
209+
functional_dependencies
210+
)
211+
}
205212
}
206213

207214
/// Create a new schema that contains the fields from this schema followed by the fields

datafusion/common/src/functional_dependencies.rs

Lines changed: 103 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::ops::Deref;
2424
use std::vec::IntoIter;
2525

2626
use crate::error::_plan_err;
27+
use crate::utils::{merge_and_order_indices, set_difference};
2728
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
2829

2930
use sqlparser::ast::TableConstraint;
@@ -271,6 +272,29 @@ impl FunctionalDependencies {
271272
self.deps.extend(other.deps);
272273
}
273274

275+
/// Sanity checks if functional dependencies are valid. For example, if
276+
/// there are 10 fields, we cannot receive any index further than 9.
277+
pub fn is_valid(&self, n_field: usize) -> bool {
278+
self.deps.iter().all(
279+
|FunctionalDependence {
280+
source_indices,
281+
target_indices,
282+
..
283+
}| {
284+
source_indices
285+
.iter()
286+
.max()
287+
.map(|&max_index| max_index < n_field)
288+
.unwrap_or(true)
289+
&& target_indices
290+
.iter()
291+
.max()
292+
.map(|&max_index| max_index < n_field)
293+
.unwrap_or(true)
294+
},
295+
)
296+
}
297+
274298
/// Adds the `offset` value to `source_indices` and `target_indices` for
275299
/// each functional dependency.
276300
pub fn add_offset(&mut self, offset: usize) {
@@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies(
442466
} in &func_dependencies.deps
443467
{
444468
// Keep source indices in a `HashSet` to prevent duplicate entries:
445-
let mut new_source_indices = HashSet::new();
469+
let mut new_source_indices = vec![];
470+
let mut new_source_field_names = vec![];
446471
let source_field_names = source_indices
447472
.iter()
448473
.map(|&idx| aggr_input_fields[idx].qualified_name())
449474
.collect::<Vec<_>>();
475+
450476
for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
451477
// When one of the input determinant expressions matches with
452478
// the GROUP BY expression, add the index of the GROUP BY
453479
// expression as a new determinant key:
454480
if source_field_names.contains(group_by_expr_name) {
455-
new_source_indices.insert(idx);
481+
new_source_indices.push(idx);
482+
new_source_field_names.push(group_by_expr_name.clone());
456483
}
457484
}
485+
let existing_target_indices =
486+
get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
487+
let new_target_indices = get_target_functional_dependencies(
488+
aggr_input_schema,
489+
&new_source_field_names,
490+
);
491+
let mode = if existing_target_indices == new_target_indices
492+
&& new_target_indices.is_some()
493+
{
494+
// If dependency covers all GROUP BY expressions, mode will be `Single`:
495+
Dependency::Single
496+
} else {
497+
// Otherwise, existing mode is preserved:
498+
*mode
499+
};
458500
// All of the composite indices occur in the GROUP BY expression:
459501
if new_source_indices.len() == source_indices.len() {
460502
aggregate_func_dependencies.push(
461503
FunctionalDependence::new(
462-
new_source_indices.into_iter().collect(),
504+
new_source_indices,
463505
target_indices.clone(),
464506
*nullable,
465507
)
466-
// input uniqueness stays the same when GROUP BY matches with input functional dependence determinants
467-
.with_mode(*mode),
508+
.with_mode(mode),
468509
);
469510
}
470511
}
512+
471513
// If we have a single GROUP BY key, we can guarantee uniqueness after
472514
// aggregation:
473515
if group_by_expr_names.len() == 1 {
474516
// If `source_indices` contain 0, delete this functional dependency
475517
// as it will be added anyway with mode `Dependency::Single`:
476-
if let Some(idx) = aggregate_func_dependencies
477-
.iter()
478-
.position(|item| item.source_indices.contains(&0))
479-
{
480-
// Delete the functional dependency that contains zeroth idx:
481-
aggregate_func_dependencies.remove(idx);
482-
}
518+
aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0));
483519
// Add a new functional dependency associated with the whole table:
484520
aggregate_func_dependencies.push(
485521
// Use nullable property of the group by expression
@@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies(
527563
combined_target_indices.extend(target_indices.iter());
528564
}
529565
}
530-
(!combined_target_indices.is_empty())
531-
.then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
566+
(!combined_target_indices.is_empty()).then_some({
567+
let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
568+
result.sort();
569+
result
570+
})
571+
}
572+
573+
/// Returns indices for the minimal subset of GROUP BY expressions that are
574+
/// functionally equivalent to the original set of GROUP BY expressions.
575+
pub fn get_required_group_by_exprs_indices(
576+
schema: &DFSchema,
577+
group_by_expr_names: &[String],
578+
) -> Option<Vec<usize>> {
579+
let dependencies = schema.functional_dependencies();
580+
let field_names = schema
581+
.fields()
582+
.iter()
583+
.map(|item| item.qualified_name())
584+
.collect::<Vec<_>>();
585+
let mut groupby_expr_indices = group_by_expr_names
586+
.iter()
587+
.map(|group_by_expr_name| {
588+
field_names
589+
.iter()
590+
.position(|field_name| field_name == group_by_expr_name)
591+
})
592+
.collect::<Option<Vec<_>>>()?;
593+
594+
groupby_expr_indices.sort();
595+
for FunctionalDependence {
596+
source_indices,
597+
target_indices,
598+
..
599+
} in &dependencies.deps
600+
{
601+
if source_indices
602+
.iter()
603+
.all(|source_idx| groupby_expr_indices.contains(source_idx))
604+
{
605+
// If all source indices are among GROUP BY expression indices, we
606+
// can remove target indices from GROUP BY expression indices and
607+
// use source indices instead.
608+
groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
609+
groupby_expr_indices =
610+
merge_and_order_indices(groupby_expr_indices, source_indices);
611+
}
612+
}
613+
groupby_expr_indices
614+
.iter()
615+
.map(|idx| {
616+
group_by_expr_names
617+
.iter()
618+
.position(|name| &field_names[*idx] == name)
619+
})
620+
.collect()
532621
}
533622

534623
/// Updates entries inside the `entries` vector with their corresponding

datafusion/common/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ pub use file_options::file_type::{
5656
};
5757
pub use file_options::FileTypeWriterOptions;
5858
pub use functional_dependencies::{
59-
aggregate_functional_dependencies, get_target_functional_dependencies, Constraint,
60-
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
59+
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
60+
get_target_functional_dependencies, Constraint, Constraints, Dependency,
61+
FunctionalDependence, FunctionalDependencies,
6162
};
6263
pub use join_type::{JoinConstraint, JoinSide, JoinType};
6364
pub use param_value::ParamValues;

0 commit comments

Comments
 (0)