Skip to content

Commit b11685d

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-6055] Backport code-gen code for SortMergeJoin (#987)
* [CARMEL-6055] Backport code-gen code for SortMergeJoin PR list:   * Cheng Su 2021/12/30, 12:12 PM [SPARK-37726][SQL] Add spill size metrics for sort merge join * Cheng Su 2021/11/19, 12:36 PM [SPARK-37370][SQL] Add SQL configs to control newly added join code-gen in 3.3 * Cheng Su 2021/11/17, 9:48 PM [SPARK-37316][SQL] Add code-gen for existence sort merge join * Cheng Su 2021/11/17, 10:44 AM [SPARK-37341][SQL] Avoid unnecessary buffer and copy in full outer sort merge join * Cheng Su 2021/11/15, 7:34 PM [SPARK-35352][SQL] Add code-gen for full outer sort merge join * Cheng Su 2021/11/3, 11:18 AM [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join * Cheng Su 2021/6/2, 2:01 PM [SPARK-35604][SQL] Fix condition check for FULL OUTER sort merge join * Cheng Su 2021/5/27, 12:59 PM [SPARK-35351][SQL][FOLLOWUP] Avoid using `loaded` variable for LEFT ANTI SMJ code-gen * Cheng Su 2021/5/18, 3:56 PM [SPARK-35351][SQL] Add code-gen for left anti sort merge join * Cheng Su 2021/5/17, 1:49 AM [SPARK-35363][SQL][FOLLOWUP] Use fresh name for findNextJoinRows instead of hardcoding it * Cheng Su 2021/5/13, 8:52 PM [SPARK-35350][SQL] Add code-gen for left semi sort merge join * Cheng Su 2021/5/12, 10:10 PM [SPARK-35349][SQL] Add code-gen for left/right outer sort merge join * Cheng Su 2021/5/11, 10:21 AM [SPARK-35363][SQL] Refactor sort merge join code-gen be agnostic to join type * [CARMEL-6055] Backport code-gen code for SortMergeJoin * Backport pr for BroadcastNestedLoopJoin * Fix UT * Fix UT * Backport [CARMEL-3719] OOM in sort merge join * Backport [CARMEL-3719] OOM in sort merge join * fix ut * Enable code-gen for full outer sort merge join by default
1 parent a1ec6fb commit b11685d

File tree

13 files changed

+1788
-372
lines changed

13 files changed

+1788
-372
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,6 +1750,54 @@ object SQLConf {
17501750
.booleanConf
17511751
.createWithDefault(true)
17521752

1753+
val ENABLE_SORT_AGGREGATE_CODEGEN =
1754+
buildConf("spark.sql.codegen.aggregate.sortAggregate.enabled")
1755+
.internal()
1756+
.doc("When true, enable code-gen for sort aggregate.")
1757+
.version("3.3.0")
1758+
.booleanConf
1759+
.createWithDefault(true)
1760+
1761+
val ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN =
1762+
buildConf("spark.sql.codegen.join.fullOuterShuffledHashJoin.enabled")
1763+
.internal()
1764+
.doc("When true, enable code-gen for FULL OUTER shuffled hash join.")
1765+
.version("3.3.0")
1766+
.booleanConf
1767+
.createWithDefault(true)
1768+
1769+
val ENABLE_OUTER_SORT_MERGE_JOIN_CODEGEN =
1770+
buildConf("spark.sql.codegen.join.outerSortMergeJoin.enabled")
1771+
.internal()
1772+
.doc("When true, enable code-gen for LEFT OUTER, RIGHT OUTER sort merge join")
1773+
.version("3.3.0")
1774+
.booleanConf
1775+
.createWithDefault(true)
1776+
1777+
val ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN =
1778+
buildConf("spark.sql.codegen.join.fullOuterSortMergeJoin.enabled")
1779+
.internal()
1780+
.doc("When true, enable code-gen for FULL OUTER sort merge join.")
1781+
.version("3.3.0")
1782+
.booleanConf
1783+
.createWithDefault(true)
1784+
1785+
val ENABLE_EXISTENCE_SORT_MERGE_JOIN_CODEGEN =
1786+
buildConf("spark.sql.codegen.join.existenceSortMergeJoin.enabled")
1787+
.internal()
1788+
.doc("When true, enable code-gen for Existence sort merge join.")
1789+
.version("3.3.0")
1790+
.booleanConf
1791+
.createWithDefault(true)
1792+
1793+
val ENABLE_SEMI_OR_ANTI_MERGE_JOIN_CODEGEN =
1794+
buildConf("spark.sql.codegen.join.semiOrAntiSortMergeJoin.enabled")
1795+
.internal()
1796+
.doc("When true, enable code-gen for semi or anti sort merge join")
1797+
.version("3.3.0")
1798+
.booleanConf
1799+
.createWithDefault(true)
1800+
17531801
val MAX_NESTED_VIEW_DEPTH =
17541802
buildConf("spark.sql.view.maxNestedViewDepth")
17551803
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
7474
}
7575

7676
private var spillableArray: UnsafeExternalSorter = _
77+
private var totalSpillBytes: Long = 0
7778
private var numRows = 0
7879

7980
// A counter to keep track of total modifications done to this array since its creation.
@@ -86,6 +87,17 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
8687

8788
def isEmpty: Boolean = numRows == 0
8889

90+
/**
91+
* Total number of bytes that has been spilled into disk so far.
92+
*/
93+
def spillSize: Long = {
94+
if (spillableArray != null) {
95+
totalSpillBytes + spillableArray.getSpillSize
96+
} else {
97+
totalSpillBytes
98+
}
99+
}
100+
89101
/**
90102
* Clears up resources (eg. memory) held by the backing storage
91103
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
3434
import org.apache.spark.sql.catalyst.rules.Rule
3535
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
3636
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
37-
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
37+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
3838
import org.apache.spark.sql.execution.metric.SQLMetrics
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.types._
@@ -52,6 +52,7 @@ trait CodegenSupport extends SparkPlan {
5252
case _: BroadcastHashJoinExec => "bhj"
5353
case _: ShuffledHashJoinExec => "shj"
5454
case _: SortMergeJoinExec => "smj"
55+
case _: BroadcastNestedLoopJoinExec => "bnlj"
5556
case _: RDDScanExec => "rdd"
5657
case _: DataSourceScanExec => "scan"
5758
case _: InMemoryTableScanExec => "memoryScan"

0 commit comments

Comments
 (0)