-
Couldn't load subscription status.
- Fork 1.7k
Fix stack overflow calculating projected orderings #12759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1295,21 +1295,30 @@ fn construct_prefix_orderings( | |
| relevant_sort_expr: &PhysicalSortExpr, | ||
| dependency_map: &DependencyMap, | ||
| ) -> Vec<LexOrdering> { | ||
| let mut dep_enumerator = DependencyEnumerator::new(); | ||
| dependency_map[relevant_sort_expr] | ||
| .dependencies | ||
| .iter() | ||
| .flat_map(|dep| construct_orderings(dep, dependency_map)) | ||
| .flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map)) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies | ||
| /// (`dependency_map`), this function generates all possible prefix orderings | ||
| /// based on the given dependencies. | ||
| /// Generates all possible orderings where dependencies are satisfied for the | ||
| /// current projection expression. | ||
| /// | ||
| /// # Examaple | ||
| /// If `dependences` is `a + b ASC` and the dependency map holds dependencies | ||
| /// * `a ASC` --> `[c ASC]` | ||
| /// * `b ASC` --> `[d DESC]`, | ||
| /// | ||
| /// This function generates these two sort orders | ||
| /// * `[c ASC, d DESC, a + b ASC]` | ||
| /// * `[d DESC, c ASC, a + b ASC]` | ||
| /// | ||
| /// # Parameters | ||
| /// | ||
| /// * `dependencies` - A reference to the dependencies. | ||
| /// * `dependency_map` - A reference to the map of dependencies for expressions. | ||
| /// * `dependencies` - Set of relevant expressions. | ||
| /// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies` | ||
| /// | ||
| /// # Returns | ||
| /// | ||
|
|
@@ -1335,11 +1344,6 @@ fn generate_dependency_orderings( | |
| return vec![vec![]]; | ||
| } | ||
|
|
||
| // Generate all possible orderings where dependencies are satisfied for the | ||
| // current projection expression. For example, if expression is `a + b ASC`, | ||
| // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC` | ||
| // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and | ||
| // `[d DESC, c ASC, a + b ASC]`. | ||
| relevant_prefixes | ||
| .into_iter() | ||
| .multi_cartesian_product() | ||
|
|
@@ -1421,7 +1425,7 @@ struct DependencyNode { | |
| } | ||
|
|
||
| impl DependencyNode { | ||
| // Insert dependency to the state (if exists). | ||
| /// Insert dependency to the state (if exists). | ||
| fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) { | ||
| if let Some(dep) = dependency { | ||
| self.dependencies.insert(dep.clone()); | ||
|
|
@@ -1437,38 +1441,69 @@ impl DependencyNode { | |
| type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>; | ||
| type Dependencies = IndexSet<PhysicalSortExpr>; | ||
|
|
||
| /// This function recursively analyzes the dependencies of the given sort | ||
| /// expression within the given dependency map to construct lexicographical | ||
| /// orderings that include the sort expression and its dependencies. | ||
| /// | ||
| /// # Parameters | ||
| /// | ||
| /// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`) | ||
| /// for which lexicographical orderings satisfying its dependencies are to be | ||
| /// constructed. | ||
| /// - `dependency_map`: A reference to the `DependencyMap` that contains | ||
| /// dependencies for different `PhysicalSortExpr`s. | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given | ||
| /// sort expression and its dependencies. | ||
| fn construct_orderings( | ||
| referred_sort_expr: &PhysicalSortExpr, | ||
| dependency_map: &DependencyMap, | ||
| ) -> Vec<LexOrdering> { | ||
| // We are sure that `referred_sort_expr` is inside `dependency_map`. | ||
| let node = &dependency_map[referred_sort_expr]; | ||
| // Since we work on intermediate nodes, we are sure `val.target_sort_expr` | ||
| // exists. | ||
| let target_sort_expr = node.target_sort_expr.clone().unwrap(); | ||
| if node.dependencies.is_empty() { | ||
| vec![vec![target_sort_expr]] | ||
| } else { | ||
| /// Contains a mapping of all dependencies we have processed for each sort expr | ||
| struct DependencyEnumerator<'a> { | ||
| /// Maps `expr` --> `[exprs]` that have previously been processed | ||
| seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>, | ||
| } | ||
|
|
||
| impl<'a> DependencyEnumerator<'a> { | ||
| fn new() -> Self { | ||
| Self { | ||
| seen: IndexMap::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Insert a new dependency, | ||
| /// | ||
| /// returns false if the dependency was already in the map | ||
| /// returns true if the dependency was newly inserted | ||
| fn insert( | ||
| &mut self, | ||
| target: &'a PhysicalSortExpr, | ||
| dep: &'a PhysicalSortExpr, | ||
| ) -> bool { | ||
| self.seen.entry(target).or_default().insert(dep) | ||
| } | ||
|
|
||
| /// This function recursively analyzes the dependencies of the given sort | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR is substantially smaller if you look at it with whitespace blind diff: https://github.com/apache/datafusion/pull/12759/files?w=1 |
||
| /// expression within the given dependency map to construct lexicographical | ||
| /// orderings that include the sort expression and its dependencies. | ||
| /// | ||
| /// # Parameters | ||
| /// | ||
| /// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`) | ||
| /// for which lexicographical orderings satisfying its dependencies are to be | ||
| /// constructed. | ||
| /// - `dependency_map`: A reference to the `DependencyMap` that contains | ||
| /// dependencies for different `PhysicalSortExpr`s. | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given | ||
| /// sort expression and its dependencies. | ||
| fn construct_orderings( | ||
| &mut self, | ||
| referred_sort_expr: &'a PhysicalSortExpr, | ||
| dependency_map: &'a DependencyMap, | ||
| ) -> Vec<LexOrdering> { | ||
| // We are sure that `referred_sort_expr` is inside `dependency_map`. | ||
| let node = &dependency_map[referred_sort_expr]; | ||
| // Since we work on intermediate nodes, we are sure `val.target_sort_expr` | ||
| // exists. | ||
| let target_sort_expr = node.target_sort_expr.as_ref().unwrap(); | ||
| if node.dependencies.is_empty() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have some comments here? From code I can read if there is no dependencies on the plan node we return some default value which is target_sort_expr but in vector in vector format, but why it is the target_expr_clone as a default? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know for sure (this is the same logic as in the current codebase). Perhaps @akurmustafa or @berkaysynnada know? (I am happy to add/update comments, but I don't know what to put there) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After a quick review, I understand that an empty dependency means the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, I have already merged it, can we add the comments in follow up PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in #12786 |
||
| return vec![vec![target_sort_expr.clone()]]; | ||
| }; | ||
|
|
||
| node.dependencies | ||
| .iter() | ||
| .flat_map(|dep| { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering should it be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure what you mean by "the result is not used" I double checked and it seems to me that since |
||
| let mut orderings = construct_orderings(dep, dependency_map); | ||
| let mut orderings = if self.insert(target_sort_expr, dep) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the key change -- it won't checks and won't process the same expr/dependency pair again |
||
| self.construct_orderings(dep, dependency_map) | ||
| } else { | ||
| vec![] | ||
| }; | ||
| for ordering in orderings.iter_mut() { | ||
| ordering.push(target_sort_expr.clone()) | ||
| } | ||
|
|
@@ -1763,6 +1798,51 @@ mod tests { | |
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn project_equivalence_properties_test_multi() -> Result<()> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the same test that causes a stack overflow on main |
||
| // test multiple input orderings with equivalence properties | ||
| let input_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Int64, true), | ||
| Field::new("b", DataType::Int64, true), | ||
| Field::new("c", DataType::Int64, true), | ||
| Field::new("d", DataType::Int64, true), | ||
| ])); | ||
|
|
||
| let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema)); | ||
| // add equivalent ordering [a, b, c, d] | ||
| input_properties.add_new_ordering(vec![ | ||
| parse_sort_expr("a", &input_schema), | ||
| parse_sort_expr("b", &input_schema), | ||
| parse_sort_expr("c", &input_schema), | ||
| parse_sort_expr("d", &input_schema), | ||
| ]); | ||
|
|
||
| // add equivalent ordering [a, c, b, d] | ||
| input_properties.add_new_ordering(vec![ | ||
| parse_sort_expr("a", &input_schema), | ||
| parse_sort_expr("c", &input_schema), | ||
| parse_sort_expr("b", &input_schema), // NB b and c are swapped | ||
| parse_sort_expr("d", &input_schema), | ||
| ]); | ||
|
|
||
| // simply project all the columns in order | ||
| let proj_exprs = vec![ | ||
| (col("a", &input_schema)?, "a".to_string()), | ||
| (col("b", &input_schema)?, "b".to_string()), | ||
| (col("c", &input_schema)?, "c".to_string()), | ||
| (col("d", &input_schema)?, "d".to_string()), | ||
| ]; | ||
| let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; | ||
| let out_properties = input_properties.project(&projection_mapping, input_schema); | ||
|
|
||
| assert_eq!( | ||
| out_properties.to_string(), | ||
| "order: [[a@0 ASC,c@2 ASC,b@1 ASC,d@3 ASC], [a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_join_equivalence_properties() -> Result<()> { | ||
| let schema = create_test_schema()?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pull this documentation up into the main comments rather than buried in the implementation