From 935f72bd780735cd90dec1bf172988ec351fb644 Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Sun, 16 Dec 2018 22:51:27 -0800 Subject: [PATCH] Fix build after SPARK-26352 was merged --- .../spark/sql/catalyst/optimizer/joins.scala | 17 ++++++++++++++++- .../catalyst/optimizer/JoinReorderSuite.scala | 19 +++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 0b6471289a471..2feb4720f9f92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -100,7 +100,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { + if (sameOutput(p, reordered)) { reordered } else { // Reordering the joins have changed the order of the columns. @@ -108,6 +108,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index c94a8b9e318f6..38a70f0691dd4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -291,8 +291,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { val optimized = Optimize.execute(analyzed) val expected = groundTruthBestPlan.analyze - assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect - assert(analyzed.sameOutput(optimized)) + assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect + assert(sameOutput(analyzed, optimized)) compareJoinOrder(optimized, expected) } @@ -300,4 +300,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { plans.map(_.output).reduce(_ ++ _) } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } }