Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 121 additions & 41 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,21 +1295,30 @@ fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map[relevant_sort_expr]
.dependencies
.iter()
.flat_map(|dep| construct_orderings(dep, dependency_map))
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
.collect()
}

/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies
/// (`dependency_map`), this function generates all possible prefix orderings
/// based on the given dependencies.
/// Generates all possible orderings where dependencies are satisfied for the
/// current projection expression.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pull this documentation up into the main comments rather than buried in the implementation

///
/// # Examaple
/// If `dependences` is `a + b ASC` and the dependency map holds dependencies
/// * `a ASC` --> `[c ASC]`
/// * `b ASC` --> `[d DESC]`,
///
/// This function generates these two sort orders
/// * `[c ASC, d DESC, a + b ASC]`
/// * `[d DESC, c ASC, a + b ASC]`
///
/// # Parameters
///
/// * `dependencies` - A reference to the dependencies.
/// * `dependency_map` - A reference to the map of dependencies for expressions.
/// * `dependencies` - Set of relevant expressions.
/// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies`
///
/// # Returns
///
Expand All @@ -1335,11 +1344,6 @@ fn generate_dependency_orderings(
return vec![vec![]];
}

// Generate all possible orderings where dependencies are satisfied for the
// current projection expression. For example, if expression is `a + b ASC`,
// and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC`
// is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and
// `[d DESC, c ASC, a + b ASC]`.
relevant_prefixes
.into_iter()
.multi_cartesian_product()
Expand Down Expand Up @@ -1421,7 +1425,7 @@ struct DependencyNode {
}

impl DependencyNode {
// Insert dependency to the state (if exists).
/// Insert dependency to the state (if exists).
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
Expand All @@ -1437,38 +1441,69 @@ impl DependencyNode {
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;

/// This function recursively analyzes the dependencies of the given sort
/// expression within the given dependency map to construct lexicographical
/// orderings that include the sort expression and its dependencies.
///
/// # Parameters
///
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
/// for which lexicographical orderings satisfying its dependencies are to be
/// constructed.
/// - `dependency_map`: A reference to the `DependencyMap` that contains
/// dependencies for different `PhysicalSortExpr`s.
///
/// # Returns
///
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
/// sort expression and its dependencies.
fn construct_orderings(
referred_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.clone().unwrap();
if node.dependencies.is_empty() {
vec![vec![target_sort_expr]]
} else {
/// Contains a mapping of all dependencies we have processed for each sort expr
struct DependencyEnumerator<'a> {
/// Maps `expr` --> `[exprs]` that have previously been processed
seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
}

impl<'a> DependencyEnumerator<'a> {
fn new() -> Self {
Self {
seen: IndexMap::new(),
}
}

/// Insert a new dependency,
///
/// returns false if the dependency was already in the map
/// returns true if the dependency was newly inserted
fn insert(
&mut self,
target: &'a PhysicalSortExpr,
dep: &'a PhysicalSortExpr,
) -> bool {
self.seen.entry(target).or_default().insert(dep)
}

/// This function recursively analyzes the dependencies of the given sort
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is substantially smaller if you look at it with whitespace blind diff: https://github.com/apache/datafusion/pull/12759/files?w=1

/// expression within the given dependency map to construct lexicographical
/// orderings that include the sort expression and its dependencies.
///
/// # Parameters
///
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
/// for which lexicographical orderings satisfying its dependencies are to be
/// constructed.
/// - `dependency_map`: A reference to the `DependencyMap` that contains
/// dependencies for different `PhysicalSortExpr`s.
///
/// # Returns
///
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
/// sort expression and its dependencies.
fn construct_orderings(
&mut self,
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
if node.dependencies.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some comments here? From code I can read if there is no dependencies on the plan node we return some default value which is target_sort_expr but in vector in vector format, but why it is the target_expr_clone as a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know for sure (this is the same logic as in the current codebase). Perhaps @akurmustafa or @berkaysynnada know? (I am happy to add/update comments, but I don't know what to put there)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a quick review, I understand that an empty dependency means the referred_sort_expr represents a global ordering. In that case, we can simply return its projected version, which is the target_expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I have already merged it, can we add the comments in follow up PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #12786

return vec![vec![target_sort_expr.clone()]];
};

node.dependencies
.iter()
.flat_map(|dep| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering should it be a flat_map as the result is not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by "the result is not used"

I double checked and it seems to me that since self.construct_orderings returns a Vec<LexOrdering>, the result of this recursive call is combined with other recursive calls.

let mut orderings = construct_orderings(dep, dependency_map);
let mut orderings = if self.insert(target_sort_expr, dep) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the key change -- it won't checks and won't process the same expr/dependency pair again

self.construct_orderings(dep, dependency_map)
} else {
vec![]
};
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
Expand Down Expand Up @@ -1763,6 +1798,51 @@ mod tests {
Ok(())
}

#[test]
fn project_equivalence_properties_test_multi() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same test that causes a stack overflow on main

// test multiple input orderings with equivalence properties
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int64, true),
]));

let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
// add equivalent ordering [a, b, c, d]
input_properties.add_new_ordering(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("b", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("d", &input_schema),
]);

// add equivalent ordering [a, c, b, d]
input_properties.add_new_ordering(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("b", &input_schema), // NB b and c are swapped
parse_sort_expr("d", &input_schema),
]);

// simply project all the columns in order
let proj_exprs = vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
(col("c", &input_schema)?, "c".to_string()),
(col("d", &input_schema)?, "d".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_properties = input_properties.project(&projection_mapping, input_schema);

assert_eq!(
out_properties.to_string(),
"order: [[a@0 ASC,c@2 ASC,b@1 ASC,d@3 ASC], [a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]"
);

Ok(())
}

#[test]
fn test_join_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
Expand Down