Skip to content

Commit 0bf1a74

Browse files
sujithjaymridulm
authored andcommitted
[SPARK-22465][CORE] Add a safety-check to RDD defaultPartitioner
## What changes were proposed in this pull request? In choosing a Partitioner to use for a cogroup-like operation between a number of RDDs, the default behaviour was if some of the RDDs already have a partitioner, we choose the one amongst them with the maximum number of partitions. This behaviour, in some cases, could hit the 2G limit (SPARK-6235). To illustrate one such scenario, consider two RDDs: rDD1: with smaller data and smaller number of partitions, alongwith a Partitioner. rDD2: with much larger data and a larger number of partitions, without a Partitioner. The cogroup of these two RDDs could hit the 2G limit, as a larger amount of data is shuffled into a smaller number of partitions. This PR introduces a safety-check wherein the Partitioner is chosen only if either of the following conditions are met: 1. if the number of partitions of the RDD associated with the Partitioner is greater than or equal to the max number of upstream partitions; or 2. if the number of partitions of the RDD associated with the Partitioner is less than and within a single order of magnitude of the max number of upstream partitions. ## How was this patch tested? Unit tests in PartitioningSuite and PairRDDFunctionsSuite Author: sujithjay <[email protected]> Closes #20002 from sujithjay/SPARK-22465.
1 parent 1219d7a commit 0bf1a74

File tree

3 files changed

+78
-3
lines changed

3 files changed

+78
-3
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
24+
import scala.math.log10
2425
import scala.reflect.ClassTag
2526
import scala.util.hashing.byteswap32
2627

@@ -42,7 +43,9 @@ object Partitioner {
4243
/**
4344
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
4445
*
45-
* If any of the RDDs already has a partitioner, choose that one.
46+
* If any of the RDDs already has a partitioner, and the number of partitions of the
47+
* partitioner is either greater than or is less than and within a single order of
48+
* magnitude of the max number of upstream partitions, choose that one.
4649
*
4750
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
4851
* spark.default.parallelism is set, then we'll use the value from SparkContext
@@ -57,8 +60,15 @@ object Partitioner {
5760
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
5861
val rdds = (Seq(rdd) ++ others)
5962
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
60-
if (hasPartitioner.nonEmpty) {
61-
hasPartitioner.maxBy(_.partitions.length).partitioner.get
63+
64+
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
65+
Some(hasPartitioner.maxBy(_.partitions.length))
66+
} else {
67+
None
68+
}
69+
70+
if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
71+
hasMaxPartitioner.get.partitioner.get
6272
} else {
6373
if (rdd.context.conf.contains("spark.default.parallelism")) {
6474
new HashPartitioner(rdd.context.defaultParallelism)
@@ -67,6 +77,22 @@ object Partitioner {
6777
}
6878
}
6979
}
80+
81+
/**
82+
* Returns true if the number of partitions of the RDD is either greater
83+
* than or is less than and within a single order of magnitude of the
84+
* max number of upstream partitions;
85+
* otherwise, returns false
86+
*/
87+
private def isEligiblePartitioner(
88+
hasMaxPartitioner: Option[RDD[_]],
89+
rdds: Seq[RDD[_]]): Boolean = {
90+
if (hasMaxPartitioner.isEmpty) {
91+
return false
92+
}
93+
val maxPartitions = rdds.map(_.partitions.length).max
94+
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
95+
}
7096
}
7197

7298
/**

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,33 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
259259
val partitioner = new RangePartitioner(22, rdd)
260260
assert(partitioner.numPartitions === 3)
261261
}
262+
263+
test("defaultPartitioner") {
264+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
265+
val rdd2 = sc
266+
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
267+
.partitionBy(new HashPartitioner(10))
268+
val rdd3 = sc
269+
.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
270+
.partitionBy(new HashPartitioner(100))
271+
val rdd4 = sc
272+
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
273+
.partitionBy(new HashPartitioner(9))
274+
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
275+
276+
val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
277+
val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
278+
val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
279+
val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
280+
val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
281+
282+
assert(partitioner1.numPartitions == rdd1.getNumPartitions)
283+
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
284+
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
285+
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
286+
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
287+
288+
}
262289
}
263290

264291

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,28 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
310310
assert(joined.size > 0)
311311
}
312312

313+
// See SPARK-22465
314+
test("cogroup between multiple RDD " +
315+
"with an order of magnitude difference in number of partitions") {
316+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
317+
val rdd2 = sc
318+
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
319+
.partitionBy(new HashPartitioner(10))
320+
val joined = rdd1.cogroup(rdd2)
321+
assert(joined.getNumPartitions == rdd1.getNumPartitions)
322+
}
323+
324+
// See SPARK-22465
325+
test("cogroup between multiple RDD" +
326+
" with number of partitions similar in order of magnitude") {
327+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
328+
val rdd2 = sc
329+
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
330+
.partitionBy(new HashPartitioner(10))
331+
val joined = rdd1.cogroup(rdd2)
332+
assert(joined.getNumPartitions == rdd2.getNumPartitions)
333+
}
334+
313335
test("rightOuterJoin") {
314336
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
315337
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

0 commit comments

Comments
 (0)