- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.7k
Encode all join conditions in a single expression field #7612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encode all join conditions in a single expression field #7612
Conversation
| let join_exprs: Vec<(Column, Column, bool)> = predicates | ||
| .iter() | ||
| .map(|p| match p { | ||
| // The predicates can contain both equal and non-equal ops. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is a good idea, it seems to make things more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Dandandan Thank you for pointing this out. I agree that this makes things more complicated, and my apologies for the inaccurate explanation of the issue. I added a comment to correct my description of the issue.
The high-level idea is that join filter and post_join_filter do not have the same meaning semantically. The former is for filtering input during/pre join, the latter is for filtering the output of the join (post-join). Currently in datafusion, we do not have a field in join relation that represents a post-join predicate (the parser/logical optimizer takes care of creating an appropriate filter relation if necessary). So the producer should only generate plans with None as post_join_filer.
I'll modify the consumer to throw an error for now if there's a post_join_filter. Later, we can wrap the join relation with a filter relation if we want to support post_join_filter, or if the later version of datafusion supports post_join_filter directly in the join relation, then we can also add the support in both producer and consumer as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into the substrait spec, and it doesn't really talk about what semantics of the post_join_filter is: https://substrait.io/relations/logical_relations/#join-operation
https://github.com/search?q=repo%3Asubstrait-io%2Fsubstrait%20post_join_filter&type=code
There is a subtle distinction between non equality filters applied during the join (in the ON clause) and applied post join for non-INNER joins: for non inner joins the filters during the join don't filter out input rows (they still come out, just with NULL matches)
So the producer should only generate plans with None as post_join_filer.
This makes sense to me
For the consumer, there is already code in DataFusion that breaks up an arbitrary Expr into equality predicates and others. This is how the SQL frontend creates a Join (a single expr):
I think we could do the same here in the subtrait consumer which would be much simpler, and would let the normal DataFusion optimization machinery work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb for the pointer! I'll look into this and update the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @nseekhao and @Dandandan
I think this code could be made simpler and left some suggestions about how to do so.
That being said the only thing I think is needed prior to merge is some tests (so we don't break this code in some future refactoring). I don't think the simplifications are necessary
| } | ||
| } | ||
| Some(RelType::Join(join)) => { | ||
| if join.post_join_filter.is_some() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|  | ||
| // create conjunction between `join_on` and `join_filter` to embed all join conditions, | ||
| // whether equal or non-equal in a single expression | ||
| let join_expr = match &join_on { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you could use conjunction here and simplify the code https://docs.rs/datafusion/latest/datafusion/optimizer/utils/fn.conjunction.html
| let join_exprs: Vec<(Column, Column, bool)> = predicates | ||
| .iter() | ||
| .map(|p| match p { | ||
| // The predicates can contain both equal and non-equal ops. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into the substrait spec, and it doesn't really talk about what semantics of the post_join_filter is: https://substrait.io/relations/logical_relations/#join-operation
https://github.com/search?q=repo%3Asubstrait-io%2Fsubstrait%20post_join_filter&type=code
There is a subtle distinction between non equality filters applied during the join (in the ON clause) and applied post join for non-INNER joins: for non inner joins the filters during the join don't filter out input rows (they still come out, just with NULL matches)
So the producer should only generate plans with None as post_join_filer.
This makes sense to me
For the consumer, there is already code in DataFusion that breaks up an arbitrary Expr into equality predicates and others. This is how the SQL frontend creates a Join (a single expr):
I think we could do the same here in the subtrait consumer which would be much simpler, and would let the normal DataFusion optimization machinery work.
| 
 I don't mind simplifying the code so I'll work on it following your earlier suggestions. As for the tests, I agree. Since, existing cases already test for mixed join condition (roundtrip_non_equi_join()) and non-equi-only join (roundtrip_non_equi_inner_join()), are you thinking a new set of tests that specifically test for the absence of  | 
| 
 Thank you! BTW I found the relevant code to extract equijoins predicates here: https://docs.rs/datafusion/latest/datafusion/optimizer/extract_equijoin_predicate/struct.ExtractEquijoinPredicate.html 
 Yes I guess I was thinking about that. Mostly I was thinking "if we broke / reverted the code in this PR accidentally, how would we know something wasn't correct"? Without some test coverage the behavior could change and we don't know. | 
| Marking as draft as I believe this PR is not waiting for any feedback (there are some comments to address). Thanks again for pushing this forward @nseekhao | 
| @alamb I extracted the predicate splits from the  As for the test, I added a test function to be used when join has a non-equi condition, to ensure that the join filter does not mistakenly get embedded into a Substrait plan as a  | 
| .collect::<Result<Vec<_>>>()?; | ||
| let (left_cols, right_cols, null_eq_nulls): (Vec<_>, Vec<_>, Vec<_>) = | ||
| itertools::multiunzip(join_exprs); | ||
| // The join expression can contain both equal and non-equal ops. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the ExtractEquijoinPredicate optimizer pass already  splits up join predicates into equijoin predicates and "other" predicates, I wonder if simply create the LogicalPlan::Join using join.expression (and let the subsequent optimizer pass sort it out)?
Something like
left.join(
  right.build()?,
  join_type,
  (vec![], vec![]),
  on, // <-- use the filter directly here, let optimizer pass extract the equijoin columns
  nulls_equal_nulls,
)?It makes me realize when looking at the API for LogicalPlanBuilder::join that the API is super confusing. It would be nice to improve that API to make it clear that a join can just take a single Expr and DataFusion will sort out figuring out the join columns, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out this is exactly what DataFrame::join_on does -- I have filed a ticket with a way to make this clearer: #7766 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @nseekhao -- I think we could merge this PR as is. I still think it could perhaps be made even simpler (by avoiding the join predicate analysis). Let me know what you think.
|  | ||
| fn check_post_join_filters(rel: &Rel) -> Result<()> { | ||
| // search for target_rel and field value in proto | ||
| match &rel.rel_type { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it might be helpful (eventually) do define TreeNode for Rel to implement walking efficiently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is better that what is on master, so I plan to merge it in once CI passes. I will propose a follow on PR that uses the newly added LogicalPlanBuilder::join_on to simplify it subsequently
| I have merged up from main and plan to merge this PR once CI passes | 
| 
 I tried this and it turns out there is some subtlety involved with  | 
| Thanks again @nseekhao | 
Which issue does this PR close?
Closes #7611 .
Rationale for this change
For
more efficientfixing the incorrect representation of join condition. Please refer to the related issue for more details.What changes are included in this PR?
onandfilterseparately inexpressionandpost_join_filter, this PR encodes both in theexpressionfield of theJoinRel.Column Eq Columnexpression, then it gets added to theonfield, otherwise, it gets AND-ed with thefilterfield.Are these changes tested?
Yes.
No new tests were added. Since the producer now hardcodes
post_join_filterasNone, if the changes are not working correctly, the existing would fail.Are there any user-facing changes?
No