diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 99a88c13456d..76f3e6533f55 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi @@ -167,7 +169,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ - var bytesRead: Long = 0L + var bytesRead: AtomicLong = new AtomicLong(0) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 036dcc49664e..4505d74d2903 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -185,6 +185,11 @@ class HadoopRDD[K, V]( array } + // Task input metrics are added to for each execution of compute(). This is not instantiated + // inside compute() for the CoalescedRDD case which calls compute() multiple times for a single + // task. See SPARK-2630 + private val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { @@ -202,13 +207,11 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - // Set the task input metrics. - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ - inputMetrics.bytesRead = split.inputSplit.value.getLength() + inputMetrics.bytesRead.addAndGet(split.inputSplit.value.getLength()) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 4c84b3f62354..dd163710e81f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -122,7 +122,7 @@ class NewHadoopRDD[K, V]( /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() + inputMetrics.bytesRead.addAndGet(split.serializableHadoopSplit.value.getLength()) } catch { case e: Exception => logWarning("Unable to get input split size in order to set task input bytes", e) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d1bee3d2c033..766c3eb52074 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -50,7 +50,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.bytesRead = bytes + inputMetrics.bytesRead.addAndGet(bytes) } private[spark] class BlockManager( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 61eb111cd910..8ea433c7077a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -71,7 +71,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp if (metrics != null) { metrics.inputMetrics.foreach { inputMetrics => executorToInputBytes(eid) = - executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead.get() } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59e5..832ae3868d5e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -209,8 +209,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.shuffleRead += shuffleReadDelta val inputBytesDelta = - (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L) - - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)) + (taskMetrics.inputMetrics.map(_.bytesRead.get()).getOrElse(0L) + - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead.get()).getOrElse(0L)) stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index db01be596e07..f4016fc95768 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -170,7 +170,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Distribution(data).get.getQuantiles().map(d =>