From be06b3749bd9834581a31cbb901cff5ba48056a4 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Sat, 15 Feb 2020 23:46:02 +0800 Subject: [PATCH 1/5] Adjust Join exec abstraction --- .../spark/sql/execution/SparkStrategies.scala | 6 ++- .../sql/execution/joins/BaseJoinExec.scala | 45 +++++++++++++++++++ .../joins/BroadcastHashJoinExec.scala | 4 +- .../joins/BroadcastNestedLoopJoinExec.scala | 9 +++- .../joins/CartesianProductExec.scala | 17 ++----- .../spark/sql/execution/joins/HashJoin.scala | 10 +---- .../joins/ShuffledHashJoinExec.scala | 4 +- .../execution/joins/SortMergeJoinExec.scala | 7 +-- .../sql/execution/joins/InnerJoinSuite.scala | 2 +- 9 files changed, 68 insertions(+), 36 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bd2684d92a1d2..2eb46fd48eda6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -286,7 +286,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { - Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) + Some(Seq(joins.CartesianProductExec( + planLater(left), planLater(right), joinType, condition))) } else { None } @@ -367,7 +368,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { - Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) + Some(Seq(joins.CartesianProductExec( + planLater(left), planLater(right), joinType, condition))) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala new file mode 100644 index 0000000000000..ae664eb85c964 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils} + +/** + * Holds common logic for join operators + */ +trait BaseJoinExec extends BinaryExecNode { + def joinType: JoinType + def condition: Option[Expression] + + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ($opId)".trim + } + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Join condition : ${joinCondStr} + """.stripMargin + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index fd4a7897c7ad1..08128d8f69dab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan} +import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{BooleanType, LongType} @@ -44,7 +44,7 @@ case class BroadcastHashJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan) - extends BinaryExecNode with HashJoin with CodegenSupport { + extends HashJoin with CodegenSupport { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index 5517c0dcdb188..c759f44490a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.collection.{BitSet, CompactBuffer} @@ -32,7 +32,7 @@ case class BroadcastNestedLoopJoinExec( right: SparkPlan, buildSide: BuildSide, joinType: JoinType, - condition: Option[Expression]) extends BinaryExecNode { + condition: Option[Expression]) extends BaseJoinExec { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -43,6 +43,11 @@ case class BroadcastNestedLoopJoinExec( case BuildLeft => (right, left) } + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ${buildSide} ($opId)".trim + } + override def requiredChildDistribution: Seq[Distribution] = buildSide match { case BuildLeft => BroadcastDistribution(IdentityBroadcastMode) :: UnspecifiedDistribution :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 7e2f487fdcc5d..2cc6f64b646e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -22,7 +22,8 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, Predicate, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner -import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, ExternalAppendOnlyUnsafeRowArray, SparkPlan} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.CompletionIterator @@ -60,23 +61,13 @@ class UnsafeCartesianRDD( case class CartesianProductExec( left: SparkPlan, right: SparkPlan, - condition: Option[Expression]) extends BinaryExecNode { + joinType: JoinType, + condition: Option[Expression]) extends BaseJoinExec { override def output: Seq[Attribute] = left.output ++ right.output override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - override def verboseStringWithOperatorId(): String = { - val joinCondStr = if (condition.isDefined) { - s"${condition.get}" - } else "None" - - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} - """.stripMargin - } - protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index f4796c194cb4f..e636ec01ba285 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -22,20 +22,14 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ExplainUtils, RowIterator, SparkPlan} +import org.apache.spark.sql.execution.{ExplainUtils, RowIterator} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegralType, LongType} -trait HashJoin { - self: SparkPlan => - +trait HashJoin extends BaseJoinExec { def leftKeys: Seq[Expression] def rightKeys: Seq[Expression] - def joinType: JoinType def buildSide: BuildSide - def condition: Option[Expression] - def left: SparkPlan - def right: SparkPlan override def simpleStringWithNodeId(): String = { val opId = ExplainUtils.getOpId(this) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index a8361fd7dd559..755a63e545ef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -39,7 +39,7 @@ case class ShuffledHashJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan) - extends BinaryExecNode with HashJoin { + extends HashJoin { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 7a08dd1afd3a6..19c248da6709f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -41,7 +41,7 @@ case class SortMergeJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean = false) extends BinaryExecNode with CodegenSupport { + isSkewJoin: Boolean = false) extends BaseJoinExec with CodegenSupport { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -52,11 +52,6 @@ case class SortMergeJoinExec( override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator - override def simpleStringWithNodeId(): String = { - val opId = ExplainUtils.getOpId(this) - s"$nodeName $joinType ($opId)".trim - } - override def verboseStringWithOperatorId(): String = { val joinCondStr = if (condition.isDefined) { s"${condition.get}" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 08898f80034e6..8cfc9e43494d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -191,7 +191,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - CartesianProductExec(left, right, Some(condition())), + CartesianProductExec(left, right, Inner, Some(condition())), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } From de2198ed7d7a66a9f2c4c688c46b91aeeacc2dc1 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Mon, 17 Feb 2020 22:34:00 +0800 Subject: [PATCH 2/5] address comment --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../spark/sql/execution/joins/CartesianProductExec.scala | 5 +++-- .../apache/spark/sql/execution/joins/InnerJoinSuite.scala | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2eb46fd48eda6..65b7f86da00ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -287,7 +287,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec( - planLater(left), planLater(right), joinType, condition))) + planLater(left), planLater(right), condition))) } else { None } @@ -369,7 +369,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec( - planLater(left), planLater(right), joinType, condition))) + planLater(left), planLater(right), condition))) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 2cc6f64b646e2..52b98d735e282 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -22,7 +22,7 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, Predicate, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.CompletionIterator @@ -61,8 +61,9 @@ class UnsafeCartesianRDD( case class CartesianProductExec( left: SparkPlan, right: SparkPlan, - joinType: JoinType, condition: Option[Expression]) extends BaseJoinExec { + override def joinType: JoinType = Inner + override def output: Seq[Attribute] = left.output ++ right.output override lazy val metrics = Map( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 8cfc9e43494d5..08898f80034e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -191,7 +191,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - CartesianProductExec(left, right, Inner, Some(condition())), + CartesianProductExec(left, right, Some(condition())), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } From e9022a108bdca888b6bad358c25645473d8a98b8 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Fri, 21 Feb 2020 11:38:58 +0800 Subject: [PATCH 3/5] Address comments and fix PySpark test --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++---- .../org/apache/spark/sql/execution/joins/BaseJoinExec.scala | 2 ++ .../sql/execution/joins/BroadcastNestedLoopJoinExec.scala | 3 +++ .../spark/sql/execution/joins/CartesianProductExec.scala | 3 +++ .../org/apache/spark/sql/execution/joins/HashJoin.scala | 2 -- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 65b7f86da00ea..bd2684d92a1d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -286,8 +286,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { - Some(Seq(joins.CartesianProductExec( - planLater(left), planLater(right), condition))) + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) } else { None } @@ -368,8 +367,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { - Some(Seq(joins.CartesianProductExec( - planLater(left), planLater(right), condition))) + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala index ae664eb85c964..86da58b61d2c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils} trait BaseJoinExec extends BinaryExecNode { def joinType: JoinType def condition: Option[Expression] + def leftKeys: Seq[Expression] + def rightKeys: Seq[Expression] override def simpleStringWithNodeId(): String = { val opId = ExplainUtils.getOpId(this) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index c759f44490a4e..888e7af7c07ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -34,6 +34,9 @@ case class BroadcastNestedLoopJoinExec( joinType: JoinType, condition: Option[Expression]) extends BaseJoinExec { + override def leftKeys: Seq[Expression] = Nil + override def rightKeys: Seq[Expression] = Nil + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 52b98d735e282..a71bf94c45034 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -62,7 +62,10 @@ case class CartesianProductExec( left: SparkPlan, right: SparkPlan, condition: Option[Expression]) extends BaseJoinExec { + override def joinType: JoinType = Inner + override def leftKeys: Seq[Expression] = Nil + override def rightKeys: Seq[Expression] = Nil override def output: Seq[Attribute] = left.output ++ right.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index e636ec01ba285..686883a6fb915 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -27,8 +27,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegralType, LongType} trait HashJoin extends BaseJoinExec { - def leftKeys: Seq[Expression] - def rightKeys: Seq[Expression] def buildSide: BuildSide override def simpleStringWithNodeId(): String = { From 79d2ac2ecbc78b5dccee0cbb3aebf33de15cf2cd Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Sat, 22 Feb 2020 01:37:20 +0800 Subject: [PATCH 4/5] minor fix for rebase --- .../org/apache/spark/sql/execution/joins/BaseJoinExec.scala | 2 +- .../scala/org/apache/spark/sql/execution/joins/HashJoin.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala index 86da58b61d2c9..f223118088f0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala @@ -41,7 +41,7 @@ trait BaseJoinExec extends BinaryExecNode { } else "None" s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Join condition : ${joinCondStr} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 686883a6fb915..ca178defea8a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -38,7 +38,6 @@ trait HashJoin extends BaseJoinExec { val joinCondStr = if (condition.isDefined) { s"${condition.get}" } else "None" - s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} |${ExplainUtils.generateFieldString("Left keys", leftKeys)} From 17af74fbf9af7d80aba5c343b7c8cb4fd36fa14b Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 25 Feb 2020 00:38:36 +0800 Subject: [PATCH 5/5] address comment --- .../sql/execution/joins/BaseJoinExec.scala | 17 +++++++++++++---- .../spark/sql/execution/joins/HashJoin.scala | 12 ------------ .../sql/execution/joins/SortMergeJoinExec.scala | 12 ------------ 3 files changed, 13 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala index f223118088f0f..86b31eb0d0c7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BaseJoinExec.scala @@ -39,9 +39,18 @@ trait BaseJoinExec extends BinaryExecNode { val joinCondStr = if (condition.isDefined) { s"${condition.get}" } else "None" - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} - """.stripMargin + if (leftKeys.nonEmpty || rightKeys.nonEmpty) { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("Left keys", leftKeys)} + |${ExplainUtils.generateFieldString("Right keys", rightKeys)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} + """.stripMargin + } else { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} + """.stripMargin + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index ca178defea8a8..7f90a51c1f234 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -34,18 +34,6 @@ trait HashJoin extends BaseJoinExec { s"$nodeName $joinType ${buildSide} ($opId)".trim } - override def verboseStringWithOperatorId(): String = { - val joinCondStr = if (condition.isDefined) { - s"${condition.get}" - } else "None" - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Left keys", leftKeys)} - |${ExplainUtils.generateFieldString("Right keys", rightKeys)} - |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} - """.stripMargin - } - override def output: Seq[Attribute] = { joinType match { case _: InnerLike => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 19c248da6709f..2c57956de5bca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -52,18 +52,6 @@ case class SortMergeJoinExec( override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator - override def verboseStringWithOperatorId(): String = { - val joinCondStr = if (condition.isDefined) { - s"${condition.get}" - } else "None" - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Left keys", leftKeys)} - |${ExplainUtils.generateFieldString("Right keys", rightKeys)} - |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} - """.stripMargin - } - override def output: Seq[Attribute] = { joinType match { case _: InnerLike =>