Skip to content

Conversation

adrian-wang
Copy link
Contributor

Thanks for the initial work from @Ishiihara in #3173

This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using sort merge join. In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient.

We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(true) and ShuffledHashJoin(false), probably we want the default value of it be false at first.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29224 has started for PR 5208 at commit b87df90.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29224 has finished for PR 5208 at commit b87df90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29224/
Test FAILed.

@adrian-wang adrian-wang force-pushed the smj branch 2 times, most recently from 9220280 to cb1e18d Compare March 30, 2015 04:15
@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29382 has started for PR 5208 at commit cb1e18d.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29382 has finished for PR 5208 at commit cb1e18d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29382/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29383 has started for PR 5208 at commit 6df9f01.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29383 has finished for PR 5208 at commit 6df9f01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29383/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29530 has started for PR 5208 at commit d7bfe07.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29530 has finished for PR 5208 at commit d7bfe07.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29530/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29532 has started for PR 5208 at commit c34c96e.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29532 has finished for PR 5208 at commit c34c96e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29532/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29533 has started for PR 5208 at commit f5f81db.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29533 has finished for PR 5208 at commit f5f81db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29533/
Test FAILed.

@adrian-wang
Copy link
Contributor Author

I am not getting this error locally... what's wrong?

@adrian-wang
Copy link
Contributor Author

This exception only exists on current master, I didn't get this locally because I was working on a March-26 master. This could be a potential bug we introduced during this period.

cc @chenghao-intel

@chenghao-intel
Copy link
Contributor

From the log, seems the output fields of the PhysicalRDD changed its order, can you rebase against the latest code and try again in your local?

== Physical Plan ==
Project [b#2957,a#2959]
 SortMergeJoin [a#2956], [b#2960], Inner
  Exchange (HashSortedPartitioning [a#2956], 200)
   PhysicalRDD [b#2957,a#2956], MapPartitionsRDD[1584] at map at FilteredScanSuite.scala:85
  Exchange (HashSortedPartitioning [b#2960], 200)
   PhysicalRDD [a#2959,b#2960], MapPartitionsRDD[1587] at map at FilteredScanSuite.scala:85

@adrian-wang
Copy link
Contributor Author

yes, after rebase i can see this exception

@SparkQA
Copy link

SparkQA commented Apr 2, 2015

Test build #29584 has started for PR 5208 at commit 7a869c5.

@SparkQA
Copy link

SparkQA commented Apr 2, 2015

Test build #29584 has finished for PR 5208 at commit 7a869c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29584/
Test PASSed.

@adrian-wang
Copy link
Contributor Author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it false as default, the SMJ should be experimental feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, just use true for Jenkins testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like what we do in SparkStrategies, use execution.ExternalSort when sqlContext.conf.externalSortEnabled is true.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30309 has started for PR 5208 at commit f515cd2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use execution.ExternalSort when sqlContext.conf.externalSortEnabled is true.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30315 has started for PR 5208 at commit f91a2ae.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30309 has finished for PR 5208 at commit f515cd2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch adds the following new dependencies:
    • snappy-java-1.1.1.7.jar
  • This patch removes the following dependencies:
    • snappy-java-1.1.1.6.jar

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30309/
Test FAILed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this line is redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see... For RangePartitioner..

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30315 has finished for PR 5208 at commit f91a2ae.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch adds the following new dependencies:
    • commons-math3-3.4.1.jar
    • snappy-java-1.1.1.7.jar
  • This patch removes the following dependencies:
    • commons-math3-3.1.1.jar
    • snappy-java-1.1.1.6.jar

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30315/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30319 has started for PR 5208 at commit 5049d88.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30319 has finished for PR 5208 at commit 5049d88.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30319/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30321 has started for PR 5208 at commit 2493b9f.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30304 has finished for PR 5208 at commit ec8061b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30304/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30321 has finished for PR 5208 at commit 2493b9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30321/
Test PASSed.

@marmbrus
Copy link
Contributor

I manually fixed the conflicts while merging to master. Thanks! I'm excited to test out the performance of this new feature :)

@asfgit asfgit closed this in 585638e Apr 15, 2015
@adrian-wang
Copy link
Contributor Author

Thanks!

@justmytwospence
Copy link

Is this feature limited to equi-joins?

@adrian-wang
Copy link
Contributor Author

@justmytwospence yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants