Skip to content

Conversation

@EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Nov 16, 2022

What changes were proposed in this pull request?

Rule PushDownLeftSemiAntiJoin should not push an anti-join below an Aggregate when the translated (cf. aliasMap) join conditions become ambiguous w.r.t. to both join sides.

Why are the changes needed?

This example fails with distinct(), and succeeds without distinct(), but both queries are identical:

val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, "id", "left_anti").collect()
assert(result.length == 1)

With distinct(), rule PushDownLeftSemiAntiJoin creates a join condition (id#774 + 1) = id#774, which can never be true. This effectively removes the anti-join.

Before this PR:
The anti-join is fully removed from the plan.

*(2) HashAggregate(keys=[id#4], functions=[], output=[id#6])
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#4, 200), ENSURE_REQUIREMENTS, [plan_id=19]
         +- *(1) HashAggregate(keys=[id#4], functions=[], output=[id#4])
            +- *(1) LocalTableScan [id#4]

This is caused by PushDownLeftSemiAntiJoin adding join condition (id#774 + 1) = id#774:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#776 = id#774)                  'Aggregate [id#774], [(id#774 + 1) AS id#776]
!:- Aggregate [id#774], [(id#774 + 1) AS id#776]   +- 'Join LeftAnti, ((id#774 + 1) = id#774)
!:  +- LocalRelation [id#774]                         :- LocalRelation [id#774]
!+- Aggregate [id#774], [id#774]                      +- Aggregate [id#774], [id#774]
!   +- LocalRelation [id#774]                            +- LocalRelation [id#774]

After this PR:
Join condition id#776 = id#774 is still translated into (id#774 + 1) = id#774 but recognized as ambiguous to both sides of the prospect join and hence not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:

*(4) BroadcastHashJoin [id#53], [id#51], LeftAnti, BuildRight, false
:- *(4) HashAggregate(keys=[id#51], functions=[], output=[id#53])
:  +- AQEShuffleRead coalesced
:     +- ShuffleQueryStage 0
:        +- Exchange hashpartitioning(id#51, 200), ENSURE_REQUIREMENTS, [plan_id=382]
:           +- *(1) HashAggregate(keys=[id#51], functions=[], output=[id#51])
:              +- *(1) LocalTableScan [id#51]
+- BroadcastQueryStage 3
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=424]
      +- *(3) HashAggregate(keys=[id#51], functions=[], output=[id#51])
         +- AQEShuffleRead coalesced
            +- ShuffleQueryStage 2
               +- ReusedExchange [id#51], Exchange hashpartitioning(id#51, 200), ENSURE_REQUIREMENTS, [plan_id=382]

Does this PR introduce any user-facing change?

It fixes correctness.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Nov 16, 2022
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 16, 2022

I did not manage to test this in LeftSemiAntiJoinPushDownSuite, which would be preferrably.

My approach is

  test("Aggregate: LeftAnti join no pushdown on ambiguity") {
    val relation = testRelation
      .groupBy($"b")($"b", sum($"c").as("sum"))
    val relationPlusOne = relation.select(($"b" + 1).as("b"))

    val originalQuery = relationPlusOne
      .join(relation, joinType = LeftAnti, usingColumns = Seq("b"))

    val optimized = Optimize.execute(originalQuery.analyze)
    comparePlans(optimized, originalQuery.analyze)
  }

This creates plan

Project [b#7]
+- Join LeftAnti, (b#7 = b#12)
   :- Project [(b#1 + 1) AS b#7]
   :  +- Aggregate [b#1], [b#1, sum(c#2) AS sum#6L]
   :     +- LocalRelation <empty>, [a#0, b#1, c#2]
   +- Aggregate [b#12], [b#12, sum(c#13) AS sum#6L]
      +- LocalRelation <empty>, [a#11, b#12, c#13]

while this plan would be required to expose the bug (both Aggregate plans have to have identical references):

Project [b#7]
+- Join LeftAnti, (b#7 = b#1)
   :- Project [(b#1 + 1) AS b#7]
   :  +- Aggregate [b#1], [b#1, sum(c#2) AS sum#6L]
   :     +- LocalRelation <empty>, [a#0, b#1, c#2]
   +- Aggregate [b#1], [b#1, sum(c#2) AS sum#6L]
      +- LocalRelation <empty>, [a#0, b#1, c#2]

@EnricoMi
Copy link
Contributor Author

Note: Window, Union and UnaryNode in PushDownLeftSemiAntiJoin might be affected as well and should be tested in LeftSemiAntiJoinPushDownSuite as well.

@EnricoMi EnricoMi changed the title [SPARK-41162][SQL] Do not push down join predicate that are ambiguous to both sides [SPARK-41162][SQL] Do not push down join predicates that are ambiguous Nov 16, 2022
@EnricoMi EnricoMi changed the title [SPARK-41162][SQL] Do not push down join predicates that are ambiguous [SPARK-41162][SQL] Do not push down anti-join predicates that become ambiguous Nov 16, 2022
@EnricoMi EnricoMi force-pushed the branch-leftanti-rule-fix branch from 6599f96 to 08f6ea3 Compare November 17, 2022 07:15
@EnricoMi
Copy link
Contributor Author

@wangyum @cloud-fan appreciate your suggestion on how to test this bug in LeftSemiAntiJoinPushDownSuite (see #38676 (comment)).

@wangyum
Copy link
Member

wangyum commented Nov 18, 2022

@EnricoMi @cloud-fan Could we fix the DeduplicateRelations? It did not generate different expression IDs for all conflicting attributes:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
 Join LeftSemi                         Join LeftSemi
 :- Project [(id#4 + 1) AS id#6]       :- Project [(id#4 + 1) AS id#6]
 :  +- Deduplicate [id#4]              :  +- Deduplicate [id#4]
 :     +- Project [value#1 AS id#4]    :     +- Project [value#1 AS id#4]
 :        +- LocalRelation [value#1]   :        +- LocalRelation [value#1]
 +- Deduplicate [id#4]                 +- Deduplicate [id#4]
!   +- Project [value#1 AS id#4]          +- Project [value#8 AS id#4]
!      +- LocalRelation [value#1]            +- LocalRelation [value#8]

@EnricoMi
Copy link
Contributor Author

Could we fix the DeduplicateRelations?

Interesting, that sounds like a better solution. I'll look into it.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 18, 2022

Problem is that DeduplicateRelations is only considering duplicates between left output and right output, and not duplicates between left references and right output. I have sketched a fix for Join and LateralJoin, including a proper test.

There is now a second run of rule DeduplicateRelations:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
 'Join UsingJoin(Inner,List(a))                 'Join UsingJoin(Inner,List(a))
 :- Project [(a#1 + 1) AS a#2]                  :- Project [(a#1 + 1) AS a#2]
 :  +- Deduplicate                              :  +- Deduplicate
 :     +- Project [value#0 AS a#1]              :     +- Project [value#0 AS a#1]
 :        +- LocalRelation <empty>, [value#0]   :        +- LocalRelation <empty>, [value#0]
 +- Deduplicate                                 +- Deduplicate
!   +- Project [value#3 AS a#1]                    +- Project [value#3 AS a#4]
       +- LocalRelation <empty>, [value#3]            +- LocalRelation <empty>, [value#3]

It is now safe to apply rule PushDownLeftSemiAntiJoin.

This could potentially be done for all operators specifically handled in DeduplicateRelations.apply(), i.e. AsOfJoin, Intersect, Except, Union and MergeIntoTable.

Deduplicating attributes that are already referenced will break the plan as those references break.

After applying rule org.apache.spark.sql.catalyst.optimizer.RewriteLateralSubquery in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken.

https://github.com/G-Research/spark/actions/runs/3498935957/jobs/5862050045

@EnricoMi EnricoMi force-pushed the branch-leftanti-rule-fix branch from 0948536 to c7eaaa2 Compare November 18, 2022 17:26
condition: Option[Expression]) extends UnaryNode {

require(Seq(Inner, LeftOuter, Cross).contains(joinType),
require(Seq(Inner, LeftOuter, Cross).contains(joinType match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just needed by sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DeduplicateRelationsSuite.scala:

val originalQuery = left.lateralJoin(right, UsingJoin(Inner, Seq("a")))

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi
Copy link
Contributor Author

@wangyum @cloud-fan I am not sure if this is the right approach to fix DeduplicateRelations. Please advise.

Problem is that DeduplicateRelations is only considering duplicates between left output and right output, but this situation is caused by duplicates between left references and right output.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 2, 2022

@wangyum @cloud-fan what do you think about my approach? Do you have a suggestion for a better strategy?

@EnricoMi
Copy link
Contributor Author

@wangyum @cloud-fan do you consider this issue a correctness bug?

@shardulm94
Copy link
Contributor

I tried looking into this a bit

@EnricoMi @cloud-fan Could we fix the DeduplicateRelations? It did not generate different expression IDs for all conflicting attributes:

As @EnricoMi said DeduplicateRelations only considers the output attrs of the left and right, which do not conflict here. Also the Project case in PushDownLeftSemiAntiJoin calls this method which seems to check for self-join case based on conflicting expression ids. This makes me believe the duplicate expression IDs are expected here and hence DeduplicateRelations may not be at fault.

Similar to the Project case, should we add a check like canPushThroughCondition(Seq(agg.child), joinCond, rightOp) to ensure that it is safe to push the join down an Aggregate node too?

@EnricoMi
Copy link
Contributor Author

@shardulm94 you are right, canPushThroughCondition already guards Project and Union against this situation, so that should be the natural way to fix this for Aggregate as well. It is the least possible change to fix this issue.

I have created #39131, to keep it separate from this approach, which tries to fix this issue through DeduplicateRelations. Will close this PR if the other one makes it.

Thanks for the pointer!

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 4, 2023

Closed in favour of #39131.

@EnricoMi EnricoMi closed this Jan 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants