Skip to content

Commit acfa46a

Browse files
committed
SPARK-1770: Load balance elements when repartitioning.
This patch adds better balancing when performing a repartition of an RDD. Previously the elements in the RDD were hash partitioned, meaning if the RDD was skewed certain partitions would end up being very large. This commit adds load balancing of elements across the repartitioned RDD splits. The load balancing is not perfect: a given output partition can have up to N more elements than the average if there are N input partitions. However, some randomization is used to minimize the probabiliy that this happens.
1 parent 905173d commit acfa46a

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag](
328328
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
329329
: RDD[T] = {
330330
if (shuffle) {
331+
/** Distributes elements evenly across output partitions, starting from a random partition. */
332+
def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
333+
var position = (new Random(index)).nextInt(numPartitions)
334+
items.map{ t =>
335+
position = position + 1 % numPartitions
336+
(position, t)
337+
}
338+
}
339+
331340
// include a shuffle step so that our upstream tasks are still distributed
332341
new CoalescedRDD(
333-
new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
342+
new ShuffledRDD[Int, T, (Int, T)](mapPartitionsWithIndex(distributePartition),
334343
new HashPartitioner(numPartitions)),
335-
numPartitions).keys
344+
numPartitions).values
336345
} else {
337346
new CoalescedRDD(this, numPartitions)
338347
}

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
202202
assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
203203
}
204204

205+
test("repartitioned RDDs perform load balancing") {
206+
// Coalesce partitions
207+
val input = Array.fill(1000)(1)
208+
val initialPartitions = 10
209+
val data = sc.parallelize(input, initialPartitions)
210+
211+
val repartitioned1 = data.repartition(2)
212+
assert(repartitioned1.partitions.size == 2)
213+
val partitions1 = repartitioned1.glom().collect()
214+
// some noise in balancing is allowed due to randomization
215+
assert(math.abs(partitions1(0).length - 500) < initialPartitions)
216+
assert(math.abs(partitions1(1).length - 500) < initialPartitions)
217+
assert(repartitioned1.collect() === input)
218+
219+
def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) {
220+
val data = sc.parallelize(input, initialPartitions)
221+
val repartitioned = data.repartition(finalPartitions)
222+
assert(repartitioned.partitions.size == finalPartitions)
223+
val partitions = repartitioned.glom().collect()
224+
// assert all elements are present
225+
assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
226+
// assert no bucket is overloaded
227+
for (partition <- partitions) {
228+
val avg = input.size / finalPartitions
229+
val maxPossible = avg + initialPartitions
230+
assert(partition.length <= maxPossible)
231+
}
232+
}
233+
234+
testSplitPartitions(Array.fill(100)(1), 10, 20)
235+
testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
236+
}
237+
205238
test("coalesced RDDs") {
206239
val data = sc.parallelize(1 to 10, 10)
207240

0 commit comments

Comments
 (0)