-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-47217][SQL] Fix ambiguity check in self joins #45343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ing joins once the plan is de-duplicated. The fix involves using Dataset ID associated with the plans & attributes to attempt correct resolution
…elf join conditions
…rect resolution of attributes
|
While fixing the issue, found that tests in DataFrameSelfJoinSuite, where the two legs have distinct datasetId , the tests are expecting AnalysisException due to ambiguity in the join condition, which IMO is not correct as it is possible to dis-ambiguate using resolution via Dataset ID. |
| Row(1, 1) :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-47217. deduplication in nested joins with join attribute aliased") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please use : instead of . in test names?
| Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg) | ||
|
|
||
| val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"), | ||
| df1("a")).queryExecution.analyzed.asInstanceOf[Project] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't selecting df1("a") be ambiguous here? It is not ambiguous in the join condition because df1Joindf2("a") can come from only one side so the df1("a") must come from the other side. But after the join I'm not sure we can follow the same logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth @cloud-fan I have opened a new PR SPARK-47320-PR.
It contains more information and also an existing master's test which fails when join order reversed.
Currently this PR is identical to SPARK-47320-PR ..I will try to see if this PR can be splitted , else will close it and keep the SPARK-47320-PR as the only PR for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Shouldn't selecting df1("a") be ambiguous here? It is not ambiguous in the join condition because df1Joindf2("a") can come from only one side so the df1("a") must come from the other side. But after the join I'm not sure we can follow the same logic."
As I envisage , mentally, for the Join with select as
Project
|
Filter
|
Join
So what is true for filter in terms of attribute refs should also be true for Project.
But I am not basing my view on above ( i.e in terms of ExprIDs).
I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth Also another source of confusion is that join condition is attempted being resolved on un-deduplicated plan. Once the join plan is de-duplicated, and then join condition attempted being resolved, the problem changes from being ambiguous resolution to I suppose both LHS & RHS resolving to same leg, which is then handled the way of resolution by tag Id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select
I agree that there is no ambiguity in Join becuase there we can suppose that the user would like to build a relation between the left and right sides (so one side of the === must come from left and the other from the right side). This is why the following join with df1("a") === df1("a") also works:
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
df4.explain(true)
But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth
As per this PR code change , case like : df1Joindf2.join(df1, df1("a") === df1("a")) is resolved as both LHS and RHS resolving to same Df1 dataset ( which makes the join crosss product) , and then later this situation is handled via brute force through function "resolveSelfJoinCondition"
But if the user has put df1Joindf2.join(df1, df1("a") === df1Joindf2("a")), then it gets handled via this PR change as it is a valid situation.
"But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work."
Based on my understanding of the above example :
The way I interpret it is:
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
The below join condition is resolved by the HACK
val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
where as
if it was
val df4 = df1Joindf2.join(df1, df1("a") === df1Joindf2("a"))
the above is a natural and logical resolution.
df4 is a join of df1Joindf2 and df1 ( we consider only top level join)
so in a select on df4., IMO
df4.select(df1("a) , df1Joindf2("a)), there is no ambiguity as one is being taken from df1 and other from df1joinDf2.
Moreover, the Join Condition in itself should not effect the output attributes of the Join Plan, irrespective of how the join condition is interpreted ( via hack or new code path)
I understand that from point of view of ExprId it can be viewed as ambiguity. though that ambiguity goes away when viewed from datasetId .
What changes were proposed in this pull request?
There seems to be a regression from Spark 3.5 to 4.0 caused by https://issues.apache.org/jira/browse/SPARK-43838 / #41347 as the following code no longer succeed:
Please note that if we dig deeper then it turns out that if we omit the extra project from
dfthen the following is a actually a regression from Spark 2 to Spark 3:The root cause seems to be
DeduplicateRelationsas it changesdf("a")(a#7) comming from the right side when it runs on the join:and so later a
Projectnode containingdf("a")above the join, that is added by added by the.select()API, can't be resolved.This is because
DeduplicateRelationsalways keeps the attributes of the first occurance of a node (Project [_1#2 AS a#7, _2#3 AS b#8]in this case) and creates new instances for other occurances. The rule doesn't (and can't) take into account if a top level attribute can actually come from a node or not.If
spark.sql.analyzer.failAmbiguousSelfJoinis enabled then theDetectAmbiguousSelfJoincatches the issue asIf it is not enabled then
a#7can't be resolved error is thrown.The PR attemps to fix the issue in following way
If the projection fields contain AttributeReference which are not found in the incoming AttributeSet, and the AttributeRef metadata contains the DatasetId info, then the AttributeRef is converted into a new UnresolvedAttributeWithTag and the original attributeRef is passed as paramter .
In the ColumnResolutionHelper, to resolve the UnresolvedAttributeRefWithTag, a new resolution logic is used:
The dataSetId from the original attribute ref's metadata is extracted.
The first BinaryNode contained in the LogicalPlan containing this unresolved attribute, is found.
Then its right leg & left lag's unary nodes are checked for the presennce of DatasetID of attribute ref, using TreeNodeTag("__datasetid").
If both the legs contain datasetId or neither contains, then resolution exception is thrown
Else the leg which contains datasetId is used to resolve.
Why are the changes needed?
To fix the bug as exposed by the unit tests in the PR
Does this PR introduce any user-facing change?
No
How was this patch tested?
Precheckin run.
Was this patch authored or co-authored using generative AI tooling?
No