Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

How was this patch tested?

a new test

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Nov 27, 2018

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99324 has finished for PR 23153 at commit cb195cf.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

f = udf(lambda a: str(a), StringType())
# The join condition can't be pushed down, as it refers to attributes from both sides.
# The Python UDF only refer to attributes from one side, so it's evaluable.
df = left.join(right, f("a") == col("b").cast("string"), how = "left_outer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: how="left_outer"

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case j @ Join(_, _, joinType, condition)
if condition.isDefined && hasPythonUDF(condition.get) =>
case j @ Join(_, _, joinType, Some(cond)) if hasUnevaluablePythonUDF(cond, j) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed by the rule changes, we need modify the suites in PullOutPythonUDFInJoinConditionSuite, the suites should also construct the dummy python udf from both side.

@xuanyuanking
Copy link
Member

xuanyuanking commented Nov 27, 2018

Sorry for the mistake and thanks for the fix from Wenchen,

the suites should also construct the dummy python udf from both side.

I did this locally, the suites can be simply fixed by:

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
index d3867f2b6b..a0f8ae2fc7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
@@ -40,13 +40,18 @@ class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
         CheckCartesianProducts) :: Nil
   }

-  val testRelationLeft = LocalRelation('a.int, 'b.int)
-  val testRelationRight = LocalRelation('c.int, 'd.int)
+  val attrA = 'a.int
+  val attrB = 'b.int
+  val attrC = 'c.int
+  val attrD = 'd.int
+
+  val testRelationLeft = LocalRelation(attrA, attrB)
+  val testRelationRight = LocalRelation(attrC, attrD)

   // Dummy python UDF for testing. Unable to execute.
   val pythonUDF = PythonUDF("pythonUDF", null,
     BooleanType,
-    Seq.empty,
+    Seq(attrA, attrC),
     PythonEvalType.SQL_BATCHED_UDF,
     udfDeterministic = true)

@@ -118,7 +123,7 @@ class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
   test("pull out whole complex condition with multiple python udf") {
     val pythonUDF1 = PythonUDF("pythonUDF1", null,
       BooleanType,
-      Seq.empty,
+      Seq(attrA, attrC),
       PythonEvalType.SQL_BATCHED_UDF,
       udfDeterministic = true)
     val condition = (pythonUDF || 'a.attr === 'c.attr) && pythonUDF1

@mgaido91
Copy link
Contributor

the change itself seems fine to me, as @xuanyuanking mentioned, though, we should update the existing tests. What about adding a test in the new suite checking the plans instead of a end-to-end test?

@SparkQA
Copy link

SparkQA commented Nov 28, 2018

Test build #99356 has finished for PR 23153 at commit 7b985d8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 28, 2018

Test build #99358 has finished for PR 23153 at commit 7b985d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor Author

thanks, merging to master/2.4!

@asfgit asfgit closed this in affe809 Nov 28, 2018
@mgaido91
Copy link
Contributor

a late LGTM as well, thanks @cloud-fan for the patch and thanks @xuanyuanking for the review.

asfgit pushed a commit that referenced this pull request Nov 28, 2018
…dition

#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes #23153 from cloud-fan/join.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <[email protected]>

private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = {
expr.find { e =>
PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need a comment to explain why we only pull out the Scalar PythonUDF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only possible to have scalar UDF in join condition, so changing it to e.isInstanceOf[PythonUDF] is same.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

late LGTM as well

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…dition

## What changes were proposed in this pull request?

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

## How was this patch tested?

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…dition

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…dition

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants