Commit c1e995a
[SPARK-35350][SQL] Add code-gen for left semi sort merge join
### What changes were proposed in this pull request?
As title. This PR is to add code-gen support for LEFT SEMI sort merge join. The main change is to add `semiJoin` code path in `SortMergeJoinExec.doProduce()` and introduce `onlyBufferFirstMatchedRow` in `SortMergeJoinExec.genScanner()`. The latter is for left semi sort merge join without condition. For this kind of query, we don't need to buffer all matched rows, but only the first one (this is same as non-code-gen code path).
Example query:
```
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(4).select($"id".as("k2"))
val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_semi")
```
Example of generated code for the query:
```
== Subtree 5 / 5 (maxMethodCodeSize:302; maxConstantPoolSize:156(0.24% used); numInnerClasses:0) ==
*(5) Project [id#0L AS k1#2L]
+- *(5) SortMergeJoin [id#0L], [k2#6L], LeftSemi
:- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 5), ENSURE_REQUIREMENTS, [id=#27]
: +- *(1) Range (0, 10, step=1, splits=2)
+- *(4) Sort [k2#6L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k2#6L, 5), ENSURE_REQUIREMENTS, [id=#33]
+- *(3) Project [id#4L AS k2#6L]
+- *(3) Range (0, 4, step=1, splits=2)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage5(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=5
/* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator smj_streamedInput_0;
/* 010 */ private scala.collection.Iterator smj_bufferedInput_0;
/* 011 */ private InternalRow smj_streamedRow_0;
/* 012 */ private InternalRow smj_bufferedRow_0;
/* 013 */ private long smj_value_2;
/* 014 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches_0;
/* 015 */ private long smj_value_3;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 017 */
/* 018 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ smj_streamedInput_0 = inputs[0];
/* 026 */ smj_bufferedInput_0 = inputs[1];
/* 027 */
/* 028 */ smj_matches_0 = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(1, 2147483647);
/* 029 */ smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */ smj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */ }
/* 033 */
/* 034 */ private boolean findNextJoinRows(
/* 035 */ scala.collection.Iterator streamedIter,
/* 036 */ scala.collection.Iterator bufferedIter) {
/* 037 */ smj_streamedRow_0 = null;
/* 038 */ int comp = 0;
/* 039 */ while (smj_streamedRow_0 == null) {
/* 040 */ if (!streamedIter.hasNext()) return false;
/* 041 */ smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 042 */ long smj_value_0 = smj_streamedRow_0.getLong(0);
/* 043 */ if (false) {
/* 044 */ smj_streamedRow_0 = null;
/* 045 */ continue;
/* 046 */
/* 047 */ }
/* 048 */ if (!smj_matches_0.isEmpty()) {
/* 049 */ comp = 0;
/* 050 */ if (comp == 0) {
/* 051 */ comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0);
/* 052 */ }
/* 053 */
/* 054 */ if (comp == 0) {
/* 055 */ return true;
/* 056 */ }
/* 057 */ smj_matches_0.clear();
/* 058 */ }
/* 059 */
/* 060 */ do {
/* 061 */ if (smj_bufferedRow_0 == null) {
/* 062 */ if (!bufferedIter.hasNext()) {
/* 063 */ smj_value_3 = smj_value_0;
/* 064 */ return !smj_matches_0.isEmpty();
/* 065 */ }
/* 066 */ smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
/* 067 */ long smj_value_1 = smj_bufferedRow_0.getLong(0);
/* 068 */ if (false) {
/* 069 */ smj_bufferedRow_0 = null;
/* 070 */ continue;
/* 071 */ }
/* 072 */ smj_value_2 = smj_value_1;
/* 073 */ }
/* 074 */
/* 075 */ comp = 0;
/* 076 */ if (comp == 0) {
/* 077 */ comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0);
/* 078 */ }
/* 079 */
/* 080 */ if (comp > 0) {
/* 081 */ smj_bufferedRow_0 = null;
/* 082 */ } else if (comp < 0) {
/* 083 */ if (!smj_matches_0.isEmpty()) {
/* 084 */ smj_value_3 = smj_value_0;
/* 085 */ return true;
/* 086 */ } else {
/* 087 */ smj_streamedRow_0 = null;
/* 088 */ }
/* 089 */ } else {
/* 090 */ if (smj_matches_0.isEmpty()) {
/* 091 */ smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
/* 092 */ }
/* 093 */
/* 094 */ smj_bufferedRow_0 = null;
/* 095 */ }
/* 096 */ } while (smj_streamedRow_0 != null);
/* 097 */ }
/* 098 */ return false; // unreachable
/* 099 */ }
/* 100 */
/* 101 */ protected void processNext() throws java.io.IOException {
/* 102 */ while (findNextJoinRows(smj_streamedInput_0, smj_bufferedInput_0)) {
/* 103 */ long smj_value_4 = -1L;
/* 104 */ smj_value_4 = smj_streamedRow_0.getLong(0);
/* 105 */ scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
/* 106 */ boolean smj_hasOutputRow_0 = false;
/* 107 */
/* 108 */ while (!smj_hasOutputRow_0 && smj_iterator_0.hasNext()) {
/* 109 */ InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next();
/* 110 */
/* 111 */ smj_hasOutputRow_0 = true;
/* 112 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 113 */
/* 114 */ // common sub-expressions
/* 115 */
/* 116 */ smj_mutableStateArray_0[1].reset();
/* 117 */
/* 118 */ smj_mutableStateArray_0[1].write(0, smj_value_4);
/* 119 */ append((smj_mutableStateArray_0[1].getRow()).copy());
/* 120 */
/* 121 */ }
/* 122 */ if (shouldStop()) return;
/* 123 */ }
/* 124 */ ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources();
/* 125 */ }
/* 126 */
/* 127 */ }
```
### Why are the changes needed?
Improve query CPU performance. Test with one query:
```
def sortMergeJoin(): Unit = {
val N = 2 << 20
codegenBenchmark("left semi sort merge join", N) {
val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"), "left_semi")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.noop()
}
}
```
Seeing 30% of run-time improvement:
```
Running benchmark: left semi sort merge join
Running case: left semi sort merge join code-gen off
Stopped after 2 iterations, 1369 ms
Running case: left semi sort merge join code-gen on
Stopped after 5 iterations, 2743 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
left semi sort merge join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
left semi sort merge join code-gen off 676 685 13 3.1 322.2 1.0X
left semi sort merge join code-gen on 524 549 32 4.0 249.7 1.3X
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test in `WholeStageCodegenSuite.scala` and `ExistenceJoinSuite.scala`.
Closes #32528 from c21/smj-left-semi.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>1 parent 5181543 commit c1e995a
File tree
47 files changed
+1797
-1587
lines changed- sql/core/src
- main/scala/org/apache/spark/sql/execution/joins
- test
- resources/tpcds-plan-stability
- approved-plans-modified/q10.sf100
- approved-plans-v1_4
- q10.sf100
- q14a.sf100
- q14b.sf100
- q16.sf100
- q16
- q23a.sf100
- q23a
- q23b.sf100
- q23b
- q35.sf100
- q38.sf100
- q69.sf100
- q94.sf100
- q94
- q95.sf100
- q95
- approved-plans-v2_7
- q10a.sf100
- q14.sf100
- q14a.sf100
- q35.sf100
- q35a.sf100
- scala/org/apache/spark/sql/execution
- joins
Some content is hidden
Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
47 files changed
+1797
-1587
lines changedLines changed: 150 additions & 62 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
105 | 105 | | |
106 | 106 | | |
107 | 107 | | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
108 | 114 | | |
109 | | - | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
110 | 120 | | |
111 | 121 | | |
112 | 122 | | |
| |||
236 | 246 | | |
237 | 247 | | |
238 | 248 | | |
239 | | - | |
| 249 | + | |
240 | 250 | | |
241 | 251 | | |
242 | 252 | | |
| |||
273 | 283 | | |
274 | 284 | | |
275 | 285 | | |
276 | | - | |
| 286 | + | |
277 | 287 | | |
278 | 288 | | |
279 | 289 | | |
| |||
317 | 327 | | |
318 | 328 | | |
319 | 329 | | |
320 | | - | |
| 330 | + | |
321 | 331 | | |
322 | 332 | | |
323 | 333 | | |
| |||
354 | 364 | | |
355 | 365 | | |
356 | 366 | | |
357 | | - | |
| 367 | + | |
358 | 368 | | |
359 | 369 | | |
360 | 370 | | |
| |||
365 | 375 | | |
366 | 376 | | |
367 | 377 | | |
368 | | - | |
| 378 | + | |
369 | 379 | | |
370 | 380 | | |
371 | 381 | | |
| |||
435 | 445 | | |
436 | 446 | | |
437 | 447 | | |
438 | | - | |
| 448 | + | |
439 | 449 | | |
440 | 450 | | |
441 | 451 | | |
| |||
457 | 467 | | |
458 | 468 | | |
459 | 469 | | |
460 | | - | |
| 470 | + | |
461 | 471 | | |
462 | 472 | | |
463 | 473 | | |
| |||
468 | 478 | | |
469 | 479 | | |
470 | 480 | | |
| 481 | + | |
| 482 | + | |
| 483 | + | |
| 484 | + | |
| 485 | + | |
| 486 | + | |
| 487 | + | |
| 488 | + | |
| 489 | + | |
| 490 | + | |
| 491 | + | |
471 | 492 | | |
472 | 493 | | |
473 | 494 | | |
| |||
483 | 504 | | |
484 | 505 | | |
485 | 506 | | |
486 | | - | |
487 | | - | |
| 507 | + | |
| 508 | + | |
| 509 | + | |
488 | 510 | | |
489 | 511 | | |
490 | 512 | | |
491 | 513 | | |
492 | 514 | | |
493 | 515 | | |
494 | | - | |
| 516 | + | |
495 | 517 | | |
496 | | - | |
| 518 | + | |
497 | 519 | | |
498 | 520 | | |
499 | 521 | | |
| |||
543 | 565 | | |
544 | 566 | | |
545 | 567 | | |
546 | | - | |
| 568 | + | |
547 | 569 | | |
548 | 570 | | |
549 | 571 | | |
| |||
639 | 661 | | |
640 | 662 | | |
641 | 663 | | |
| 664 | + | |
| 665 | + | |
642 | 666 | | |
643 | 667 | | |
644 | 668 | | |
645 | 669 | | |
646 | 670 | | |
647 | | - | |
| 671 | + | |
648 | 672 | | |
649 | 673 | | |
650 | 674 | | |
651 | 675 | | |
652 | 676 | | |
653 | | - | |
654 | | - | |
| 677 | + | |
| 678 | + | |
| 679 | + | |
655 | 680 | | |
656 | 681 | | |
657 | 682 | | |
| |||
674 | 699 | | |
675 | 700 | | |
676 | 701 | | |
677 | | - | |
| 702 | + | |
678 | 703 | | |
679 | 704 | | |
680 | 705 | | |
681 | 706 | | |
682 | | - | |
683 | | - | |
684 | | - | |
685 | | - | |
| 707 | + | |
686 | 708 | | |
687 | | - | |
688 | | - | |
689 | | - | |
690 | | - | |
691 | | - | |
692 | | - | |
693 | | - | |
694 | | - | |
695 | | - | |
696 | | - | |
697 | | - | |
698 | | - | |
699 | | - | |
700 | | - | |
701 | | - | |
702 | | - | |
703 | | - | |
| 709 | + | |
| 710 | + | |
| 711 | + | |
| 712 | + | |
| 713 | + | |
704 | 714 | | |
705 | | - | |
706 | | - | |
707 | | - | |
708 | | - | |
709 | | - | |
710 | | - | |
711 | | - | |
712 | | - | |
713 | | - | |
714 | | - | |
715 | | - | |
716 | | - | |
717 | | - | |
718 | | - | |
719 | | - | |
720 | | - | |
721 | | - | |
722 | | - | |
723 | | - | |
| 715 | + | |
| 716 | + | |
724 | 717 | | |
725 | | - | |
| 718 | + | |
| 719 | + | |
| 720 | + | |
726 | 721 | | |
727 | 722 | | |
728 | | - | |
729 | | - | |
| 723 | + | |
| 724 | + | |
| 725 | + | |
| 726 | + | |
| 727 | + | |
| 728 | + | |
| 729 | + | |
| 730 | + | |
| 731 | + | |
730 | 732 | | |
731 | 733 | | |
732 | 734 | | |
733 | 735 | | |
734 | 736 | | |
735 | 737 | | |
| 738 | + | |
| 739 | + | |
| 740 | + | |
| 741 | + | |
| 742 | + | |
| 743 | + | |
| 744 | + | |
| 745 | + | |
| 746 | + | |
| 747 | + | |
| 748 | + | |
| 749 | + | |
| 750 | + | |
| 751 | + | |
| 752 | + | |
| 753 | + | |
| 754 | + | |
| 755 | + | |
| 756 | + | |
| 757 | + | |
| 758 | + | |
| 759 | + | |
| 760 | + | |
| 761 | + | |
| 762 | + | |
| 763 | + | |
| 764 | + | |
| 765 | + | |
| 766 | + | |
| 767 | + | |
| 768 | + | |
| 769 | + | |
| 770 | + | |
| 771 | + | |
| 772 | + | |
| 773 | + | |
| 774 | + | |
| 775 | + | |
| 776 | + | |
| 777 | + | |
| 778 | + | |
| 779 | + | |
| 780 | + | |
| 781 | + | |
| 782 | + | |
| 783 | + | |
| 784 | + | |
| 785 | + | |
| 786 | + | |
| 787 | + | |
| 788 | + | |
| 789 | + | |
| 790 | + | |
| 791 | + | |
| 792 | + | |
| 793 | + | |
| 794 | + | |
| 795 | + | |
| 796 | + | |
| 797 | + | |
| 798 | + | |
| 799 | + | |
| 800 | + | |
| 801 | + | |
| 802 | + | |
| 803 | + | |
| 804 | + | |
| 805 | + | |
| 806 | + | |
| 807 | + | |
| 808 | + | |
| 809 | + | |
| 810 | + | |
| 811 | + | |
| 812 | + | |
| 813 | + | |
| 814 | + | |
| 815 | + | |
| 816 | + | |
| 817 | + | |
| 818 | + | |
| 819 | + | |
| 820 | + | |
| 821 | + | |
| 822 | + | |
| 823 | + | |
| 824 | + | |
736 | 825 | | |
737 | 826 | | |
738 | 827 | | |
| |||
783 | 872 | | |
784 | 873 | | |
785 | 874 | | |
786 | | - | |
787 | | - | |
| 875 | + | |
788 | 876 | | |
789 | 877 | | |
790 | 878 | | |
| |||
0 commit comments