Skip to content

Commit 109eeb3

Browse files
committed
overload getReader
1 parent 62cc46c commit 109eeb3

File tree

7 files changed

+20
-12
lines changed

7 files changed

+20
-12
lines changed

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ class CoGroupedRDD[K: ClassTag](
145145
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
146146
val it = SparkEnv.get.shuffleManager
147147
.getReader(
148-
shuffleDependency.shuffleHandle, 0, Int.MaxValue,
149-
split.index, split.index + 1, context, metrics)
148+
shuffleDependency.shuffleHandle, split.index, split.index + 1, context, metrics)
150149
.read()
151150
rddIterators += ((it, depNum))
152151
}

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
103103
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
104104
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
105105
SparkEnv.get.shuffleManager.getReader(
106-
dep.shuffleHandle, 0, Int.MaxValue,
107-
split.index, split.index + 1, context, metrics)
106+
dep.shuffleHandle, split.index, split.index + 1, context, metrics)
108107
.read()
109108
.asInstanceOf[Iterator[(K, C)]]
110109
}

core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
105105
val iter = SparkEnv.get.shuffleManager
106106
.getReader(
107107
shuffleDependency.shuffleHandle,
108-
0,
109-
Int.MaxValue,
110108
partition.index,
111109
partition.index + 1,
112110
context,

core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ private[spark] trait ShuffleManager {
4343
context: TaskContext,
4444
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
4545

46+
47+
/**
48+
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
49+
* read from all map outputs of the shuffle.
50+
*
51+
* Called on executors by reduce tasks.
52+
*/
53+
final def getReader[K, C](
54+
handle: ShuffleHandle,
55+
startPartition: Int,
56+
endPartition: Int,
57+
context: TaskContext,
58+
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
59+
getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)
60+
}
61+
4662
/**
4763
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
4864
* read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive).

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
411411
val taskContext = new TaskContextImpl(
412412
1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem)
413413
val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
414-
val reader = manager.getReader[Int, Int](shuffleHandle, 0, Int.MaxValue,
415-
0, 1, taskContext, metrics)
414+
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, taskContext, metrics)
416415
TaskContext.unset()
417416
val readData = reader.read().toIndexedSeq
418417
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)

core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ class CustomShuffledRDD[K, V, C](
106106
val part = p.asInstanceOf[CustomShuffledRDDPartition]
107107
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
108108
SparkEnv.get.shuffleManager.getReader(
109-
dependency.shuffleHandle, 0, Int.MaxValue, part.startIndexInParent,
110-
part.endIndexInParent, context, metrics)
109+
dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context, metrics)
111110
.read()
112111
.asInstanceOf[Iterator[(K, C)]]
113112
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,6 @@ class ShuffledRowRDD(
185185
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
186186
SparkEnv.get.shuffleManager.getReader(
187187
dependency.shuffleHandle,
188-
0,
189-
Int.MaxValue,
190188
startReducerIndex,
191189
endReducerIndex,
192190
context,

0 commit comments

Comments
 (0)