diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 99a88c13456d..76f3e6533f55 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi @@ -167,7 +169,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ - var bytesRead: Long = 0L + var bytesRead: AtomicLong = new AtomicLong(0) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 036dcc49664e..4505d74d2903 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -185,6 +185,11 @@ class HadoopRDD[K, V]( array } + // Task input metrics are added to for each execution of compute(). This is not instantiated + // inside compute() for the CoalescedRDD case which calls compute() multiple times for a single + // task. See SPARK-2630 + private val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { @@ -202,13 +207,11 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - // Set the task input metrics. - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ - inputMetrics.bytesRead = split.inputSplit.value.getLength() + inputMetrics.bytesRead.addAndGet(split.inputSplit.value.getLength()) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 4c84b3f62354..dd163710e81f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -122,7 +122,7 @@ class NewHadoopRDD[K, V]( /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() + inputMetrics.bytesRead.addAndGet(split.serializableHadoopSplit.value.getLength()) } catch { case e: Exception => logWarning("Unable to get input split size in order to set task input bytes", e) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d1bee3d2c033..766c3eb52074 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -50,7 +50,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.bytesRead = bytes + inputMetrics.bytesRead.addAndGet(bytes) } private[spark] class BlockManager( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 61eb111cd910..8ea433c7077a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -71,7 +71,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp if (metrics != null) { metrics.inputMetrics.foreach { inputMetrics => executorToInputBytes(eid) = - executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead.get() } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59e5..832ae3868d5e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -209,8 +209,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.shuffleRead += shuffleReadDelta val inputBytesDelta = - (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L) - - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)) + (taskMetrics.inputMetrics.map(_.bytesRead.get()).getOrElse(0L) + - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead.get()).getOrElse(0L)) stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index db01be596e07..f4016fc95768 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -170,7 +170,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Distribution(data).get.getQuantiles().map(d => {Utils.bytesToString(d.toLong)}) val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + metrics.get.inputMetrics.map(_.bytesRead.get()).getOrElse(0L).toDouble } val inputQuantiles = Input +: getQuantileCols(inputSizes) @@ -247,7 +247,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val maybeInput = metrics.flatMap(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") val inputReadable = maybeInput - .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .map(m => + s"${Utils.bytesToString(m.bytesRead.get())} (${m.readMethod.toString.toLowerCase()})") .getOrElse("") val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index b0754e3ce10d..9e933f42a6a2 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -270,7 +270,7 @@ private[spark] object JsonProtocol { def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { ("Data Read Method" -> inputMetrics.readMethod.toString) ~ - ("Bytes Read" -> inputMetrics.bytesRead) + ("Bytes Read" -> inputMetrics.bytesRead.get()) } def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { @@ -609,7 +609,7 @@ private[spark] object JsonProtocol { def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics.bytesRead.set((json \ "Bytes Read").extract[Long]) metrics } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e251660dae5d..4f34f411a257 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -438,19 +438,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val list1Get = store.get("list1") assert(list1Get.isDefined, "list1 expected to be in store") assert(list1Get.get.data.size === 2) - assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) + assert(list1Get.get.inputMetrics.bytesRead.get() === list1SizeEstimate) assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) val list2MemoryGet = store.get("list2memory") assert(list2MemoryGet.isDefined, "list2memory expected to be in store") assert(list2MemoryGet.get.data.size === 3) - assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) + assert(list2MemoryGet.get.inputMetrics.bytesRead.get() === list2SizeEstimate) assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) val list2DiskGet = store.get("list2disk") assert(list2DiskGet.isDefined, "list2memory expected to be in store") assert(list2DiskGet.get.data.size === 3) System.out.println(list2DiskGet) // We don't know the exact size of the data on disk, but it should certainly be > 0. - assert(list2DiskGet.get.inputMetrics.bytesRead > 0) + assert(list2DiskGet.get.inputMetrics.bytesRead.get() > 0) assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4156c3..5bad3e49e38f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -158,7 +158,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskMetrics.memoryBytesSpilled = base + 6 val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) - inputMetrics.bytesRead = base + 7 + inputMetrics.bytesRead.set(base + 7) taskMetrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2b45d8b69585..cfb893355a45 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -370,7 +370,7 @@ class JsonProtocolSuite extends FunSuite { private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { assert(metrics1.readMethod === metrics2.readMethod) - assert(metrics1.bytesRead === metrics2.bytesRead) + assert(metrics1.bytesRead.get() === metrics2.bytesRead.get()) } private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) { @@ -564,7 +564,7 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.bytesRead = d + e + f + inputMetrics.bytesRead.set(d + e + f) t.inputMetrics = Some(inputMetrics) } else { val sr = new ShuffleReadMetrics