Skip to content

Commit cdbc3e8

Browse files
committed
SPARK-5199. Input metrics should show up for InputFormats that return CombineFileSplits
1 parent 1420931 commit cdbc3e8

File tree

3 files changed

+12
-13
lines changed

3 files changed

+12
-13
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ class SparkHadoopUtil extends Logging {
133133
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
134134
* Returns None if the required method can't be found.
135135
*/
136-
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
136+
private[spark] def getFSBytesReadOnThreadCallback(conf: Configuration)
137137
: Option[() => Long] = {
138138
try {
139-
val threadStats = getFileSystemThreadStatistics(path, conf)
139+
val threadStats = getFileSystemThreadStatistics(conf)
140140
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
141141
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
142142
val baselineBytesRead = f()
@@ -156,10 +156,10 @@ class SparkHadoopUtil extends Logging {
156156
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
157157
* Returns None if the required method can't be found.
158158
*/
159-
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
159+
private[spark] def getFSBytesWrittenOnThreadCallback(conf: Configuration)
160160
: Option[() => Long] = {
161161
try {
162-
val threadStats = getFileSystemThreadStatistics(path, conf)
162+
val threadStats = getFileSystemThreadStatistics(conf)
163163
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
164164
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
165165
val baselineBytesWritten = f()
@@ -172,10 +172,8 @@ class SparkHadoopUtil extends Logging {
172172
}
173173
}
174174

175-
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
176-
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
177-
val scheme = qualifiedPath.toUri().getScheme()
178-
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
175+
private def getFileSystemThreadStatistics(conf: Configuration): Seq[AnyRef] = {
176+
val stats = FileSystem.getAllStatistics()
179177
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
180178
}
181179

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
3535
import org.apache.hadoop.mapred.JobID
3636
import org.apache.hadoop.mapred.TaskAttemptID
3737
import org.apache.hadoop.mapred.TaskID
38+
import org.apache.hadoop.mapred.lib.CombineFileSplit
3839
import org.apache.hadoop.util.ReflectionUtils
3940

4041
import org.apache.spark._
@@ -220,8 +221,8 @@ class HadoopRDD[K, V](
220221
// creating RecordReader, because RecordReader's constructor might read some bytes
221222
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
222223
split.inputSplit.value match {
223-
case split: FileSplit =>
224-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
224+
case FileSplit | CombineFileSplit =>
225+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(jobConf)
225226
case _ => None
226227
}
227228
)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
2525
import org.apache.hadoop.conf.{Configurable, Configuration}
2626
import org.apache.hadoop.io.Writable
2727
import org.apache.hadoop.mapreduce._
28-
import org.apache.hadoop.mapreduce.lib.input.FileSplit
28+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
2929

3030
import org.apache.spark.annotation.DeveloperApi
3131
import org.apache.spark.input.WholeTextFileInputFormat
@@ -116,8 +116,8 @@ class NewHadoopRDD[K, V](
116116
// creating RecordReader, because RecordReader's constructor might read some bytes
117117
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
118118
split.serializableHadoopSplit.value match {
119-
case split: FileSplit =>
120-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
119+
case FileSplit | CombineFileSplit =>
120+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(conf)
121121
case _ => None
122122
}
123123
)

0 commit comments

Comments
 (0)