Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Expand All @@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Expand All @@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
val stats = FileSystem.getAllStatistics()
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
Expand Down Expand Up @@ -218,13 +219,13 @@ class HadoopRDD[K, V](

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
)
}
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
Expand Down Expand Up @@ -254,7 +255,8 @@ class HadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -114,13 +114,13 @@ class NewHadoopRDD[K, V](

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.serializableHadoopSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
)
}
inputMetrics.setBytesReadCallback(bytesReadCallback)

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
Expand Down Expand Up @@ -163,7 +163,8 @@ class NewHadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
Expand Down Expand Up @@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
Expand All @@ -1086,11 +1086,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}

private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
: (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
.map(new Path(_))
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
if (bytesWrittenCallback.isDefined) {
context.taskMetrics.outputMetrics = Some(outputMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@ import org.scalatest.FunSuite
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf,
LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter,
TextInputFormat => OldTextInputFormat}
import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader}
import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}

import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -202,7 +211,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")

if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) {
if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
val taskBytesWritten = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
Expand All @@ -225,4 +234,88 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
}
}
}

test("input metrics with old CombineFileInputFormat") {
val bytesRead = runAndReturnBytesRead {
sc.hadoopFile(tmpFilePath, classOf[OldCombineTextInputFormat], classOf[LongWritable],
classOf[Text], 2).count()
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with new CombineFileInputFormat") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewCombineTextInputFormat], classOf[LongWritable],
classOf[Text], new Configuration()).count()
}
assert(bytesRead >= tmpFile.length())
}
}

/**
* Hadoop 2 has a version of this, but we can't use it for backwards compatibility
*/
class OldCombineTextInputFormat extends OldCombineFileInputFormat[LongWritable, Text] {
override def getRecordReader(split: OldInputSplit, conf: JobConf, reporter: Reporter)
: OldRecordReader[LongWritable, Text] = {
new OldCombineFileRecordReader[LongWritable, Text](conf,
split.asInstanceOf[OldCombineFileSplit], reporter, classOf[OldCombineTextRecordReaderWrapper]
.asInstanceOf[Class[OldRecordReader[LongWritable, Text]]])
}
}

class OldCombineTextRecordReaderWrapper(
split: OldCombineFileSplit,
conf: Configuration,
reporter: Reporter,
idx: Integer) extends OldRecordReader[LongWritable, Text] {

val fileSplit = new OldFileSplit(split.getPath(idx),
split.getOffset(idx),
split.getLength(idx),
split.getLocations())

val delegate: OldLineRecordReader = new OldTextInputFormat().getRecordReader(fileSplit,
conf.asInstanceOf[JobConf], reporter).asInstanceOf[OldLineRecordReader]

override def next(key: LongWritable, value: Text): Boolean = delegate.next(key, value)
override def createKey(): LongWritable = delegate.createKey()
override def createValue(): Text = delegate.createValue()
override def getPos(): Long = delegate.getPos
override def close(): Unit = delegate.close()
override def getProgress(): Float = delegate.getProgress
}

/**
* Hadoop 2 has a version of this, but we can't use it for backwards compatibility
*/
class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable,Text] {
def createRecordReader(split: NewInputSplit, context: TaskAttemptContext)
: NewRecordReader[LongWritable, Text] = {
new NewCombineFileRecordReader[LongWritable,Text](split.asInstanceOf[NewCombineFileSplit],
context, classOf[NewCombineTextRecordReaderWrapper])
}
}

class NewCombineTextRecordReaderWrapper(
split: NewCombineFileSplit,
context: TaskAttemptContext,
idx: Integer) extends NewRecordReader[LongWritable, Text] {

val fileSplit = new NewFileSplit(split.getPath(idx),
split.getOffset(idx),
split.getLength(idx),
split.getLocations())

val delegate = new NewTextInputFormat().createRecordReader(fileSplit, context)

override def initialize(split: NewInputSplit, context: TaskAttemptContext): Unit = {
delegate.initialize(fileSplit, context)
}

override def nextKeyValue(): Boolean = delegate.nextKeyValue()
override def getCurrentKey(): LongWritable = delegate.getCurrentKey
override def getCurrentValue(): Text = delegate.getCurrentValue
override def getProgress(): Float = delegate.getProgress
override def close(): Unit = delegate.close()
}