Skip to content

Conversation

@ahshahid
Copy link

@ahshahid ahshahid commented Mar 1, 2024

What changes were proposed in this pull request?

There seems to be a regression from Spark 3.5 to 4.0 caused by https://issues.apache.org/jira/browse/SPARK-43838 / #41347 as the following code no longer succeed:

val df = Seq((1, 2)).toDF("a", "b")
val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
df3.show()

Please note that if we dig deeper then it turns out that if we omit the extra project from df then the following is a actually a regression from Spark 2 to Spark 3:

val schema = StructType.fromDDL("a int, b int")
val rows = Seq(Row(1, 2))
val rdd = sparkContext.parallelize(rows)
val df = spark.createDataFrame(rdd, schema)
val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
df3.show()

The root cause seems to be DeduplicateRelations as it changes df("a") (a#7) comming from the right side when it runs on the join:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
!'Join Inner, '`=`(bb#12, b#8)              'Join Inner, '`=`(bb#12, b#18)
 :- Project [a#7 AS aa#11, b#8 AS bb#12]    :- Project [a#7 AS aa#11, b#8 AS bb#12]
 :  +- Project [_1#2 AS a#7, _2#3 AS b#8]   :  +- Project [_1#2 AS a#7, _2#3 AS b#8]
 :     +- LocalRelation [_1#2, _2#3]        :     +- LocalRelation [_1#2, _2#3]
!+- Project [_1#2 AS a#7, _2#3 AS b#8]      +- Project [_1#15 AS a#17, _2#16 AS b#18]
!   +- LocalRelation [_1#2, _2#3]              +- LocalRelation [_1#15, _2#16]

and so later a Project node containing df("a") above the join, that is added by added by the .select() API, can't be resolved.
This is because DeduplicateRelations always keeps the attributes of the first occurance of a node (Project [_1#2 AS a#7, _2#3 AS b#8] in this case) and creates new instances for other occurances. The rule doesn't (and can't) take into account if a top level attribute can actually come from a node or not.
If spark.sql.analyzer.failAmbiguousSelfJoin is enabled then the DetectAmbiguousSelfJoin catches the issue as

Column a#7 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.

If it is not enabled then a#7 can't be resolved error is thrown.

The PR attemps to fix the issue in following way

  1. If the projection fields contain AttributeReference which are not found in the incoming AttributeSet, and the AttributeRef metadata contains the DatasetId info, then the AttributeRef is converted into a new UnresolvedAttributeWithTag and the original attributeRef is passed as paramter .

  2. In the ColumnResolutionHelper, to resolve the UnresolvedAttributeRefWithTag, a new resolution logic is used:
    The dataSetId from the original attribute ref's metadata is extracted.

  3. The first BinaryNode contained in the LogicalPlan containing this unresolved attribute, is found.
    Then its right leg & left lag's unary nodes are checked for the presennce of DatasetID of attribute ref, using TreeNodeTag("__datasetid").
    If both the legs contain datasetId or neither contains, then resolution exception is thrown
    Else the leg which contains datasetId is used to resolve.

Why are the changes needed?

To fix the bug as exposed by the unit tests in the PR

Does this PR introduce any user-facing change?

No

How was this patch tested?

Precheckin run.

Was this patch authored or co-authored using generative AI tooling?

No

…ing joins once the plan is de-duplicated. The fix involves using Dataset ID associated with the plans & attributes to attempt correct resolution
@github-actions github-actions bot added the SQL label Mar 1, 2024
@ahshahid ahshahid changed the title SPARK-47217. bug fix for exception thrown in reused dataframes involv… [SPARK-45959][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is disambiguous Mar 6, 2024
@ahshahid ahshahid changed the title [SPARK-45959][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is disambiguous [SPARK-45959][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is Unambiguous Mar 6, 2024
@ahshahid
Copy link
Author

ahshahid commented Mar 6, 2024

While fixing the issue, found that tests in DataFrameSelfJoinSuite, where the two legs have distinct datasetId , the tests are expecting AnalysisException due to ambiguity in the join condition, which IMO is not correct as it is possible to dis-ambiguate using resolution via Dataset ID.
This PR fixes the above and have test modifications accordingly.
The same applies to some tests in DataFrameAsofJoinSuite.

@ahshahid ahshahid changed the title [SPARK-45959][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is Unambiguous [SPARK-47217][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is Unambiguous Mar 8, 2024
@peter-toth peter-toth changed the title [SPARK-47217][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is Unambiguous [SPARK-47217][SQL] Fix ambiguity check in self joins Mar 11, 2024
Row(1, 1) :: Nil)
}

test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use : instead of . in test names?

Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)

val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
df1("a")).queryExecution.analyzed.asInstanceOf[Project]
Copy link
Contributor

@peter-toth peter-toth Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't selecting df1("a") be ambiguous here? It is not ambiguous in the join condition because df1Joindf2("a") can come from only one side so the df1("a") must come from the other side. But after the join I'm not sure we can follow the same logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth @cloud-fan I have opened a new PR SPARK-47320-PR.
It contains more information and also an existing master's test which fails when join order reversed.
Currently this PR is identical to SPARK-47320-PR ..I will try to see if this PR can be splitted , else will close it and keep the SPARK-47320-PR as the only PR for review.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Shouldn't selecting df1("a") be ambiguous here? It is not ambiguous in the join condition because df1Joindf2("a") can come from only one side so the df1("a") must come from the other side. But after the join I'm not sure we can follow the same logic."

As I envisage , mentally, for the Join with select as
Project
|
Filter
|
Join

So what is true for filter in terms of attribute refs should also be true for Project.

But I am not basing my view on above ( i.e in terms of ExprIDs).
I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth Also another source of confusion is that join condition is attempted being resolved on un-deduplicated plan. Once the join plan is de-duplicated, and then join condition attempted being resolved, the problem changes from being ambiguous resolution to I suppose both LHS & RHS resolving to same leg, which is then handled the way of resolution by tag Id.

Copy link
Contributor

@peter-toth peter-toth Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select

I agree that there is no ambiguity in Join becuase there we can suppose that the user would like to build a relation between the left and right sides (so one side of the === must come from left and the other from the right side). This is why the following join with df1("a") === df1("a") also works:

val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
df4.explain(true)

But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work.

Copy link
Author

@ahshahid ahshahid Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth
As per this PR code change , case like : df1Joindf2.join(df1, df1("a") === df1("a")) is resolved as both LHS and RHS resolving to same Df1 dataset ( which makes the join crosss product) , and then later this situation is handled via brute force through function "resolveSelfJoinCondition"
But if the user has put df1Joindf2.join(df1, df1("a") === df1Joindf2("a")), then it gets handled via this PR change as it is a valid situation.

"But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work."

Based on my understanding of the above example :
The way I interpret it is:
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))

The below join condition is resolved by the HACK
val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))

where as
if it was
val df4 = df1Joindf2.join(df1, df1("a") === df1Joindf2("a"))
the above is a natural and logical resolution.

df4 is a join of df1Joindf2 and df1 ( we consider only top level join)
so in a select on df4., IMO
df4.select(df1("a) , df1Joindf2("a)), there is no ambiguity as one is being taken from df1 and other from df1joinDf2.

Moreover, the Join Condition in itself should not effect the output attributes of the Join Plan, irrespective of how the join condition is interpreted ( via hack or new code path)

I understand that from point of view of ExprId it can be viewed as ambiguity. though that ambiguity goes away when viewed from datasetId .

@ahshahid ahshahid closed this Mar 13, 2024
@ahshahid
Copy link
Author

The PR #45446 handles the issue comprehensively and this PR was a subset of the changes contained in PR #45446 , so closing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants