Skip to content

Commit 576483b

Browse files
committed
Move inputMetrics instantiation outside of compute()
1 parent a3f1ca1 commit 576483b

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ class HadoopRDD[K, V](
185185
array
186186
}
187187

188+
// Task input metrics are added to for each execution of compute(). This is not instantiated
189+
// inside compute() for the CoalescedRDD case which calls compute() multiple times for a single
190+
// task. See SPARK-2630
191+
private val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
192+
188193
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
189194
val iter = new NextIterator[(K, V)] {
190195

@@ -202,8 +207,6 @@ class HadoopRDD[K, V](
202207
val key: K = reader.createKey()
203208
val value: V = reader.createValue()
204209

205-
// Set the task input metrics.
206-
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
207210
try {
208211
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
209212
* always at record boundaries, so tasks may need to read into other splits to complete

0 commit comments

Comments
 (0)