@@ -26,7 +26,16 @@ import org.scalatest.FunSuite
2626import org .apache .hadoop .conf .Configuration
2727import org .apache .hadoop .fs .{FileSystem , Path }
2828import org .apache .hadoop .io .{LongWritable , Text }
29- import org .apache .hadoop .mapreduce .lib .input .{TextInputFormat => NewTextInputFormat }
29+ import org .apache .hadoop .mapred .{FileSplit => OldFileSplit , InputSplit => OldInputSplit , JobConf ,
30+ LineRecordReader => OldLineRecordReader , RecordReader => OldRecordReader , Reporter ,
31+ TextInputFormat => OldTextInputFormat }
32+ import org .apache .hadoop .mapred .lib .{CombineFileInputFormat => OldCombineFileInputFormat ,
33+ CombineFileSplit => OldCombineFileSplit , CombineFileRecordReader => OldCombineFileRecordReader }
34+ import org .apache .hadoop .mapreduce .{InputSplit => NewInputSplit , RecordReader => NewRecordReader ,
35+ TaskAttemptContext }
36+ import org .apache .hadoop .mapreduce .lib .input .{CombineFileInputFormat => NewCombineFileInputFormat ,
37+ CombineFileRecordReader => NewCombineFileRecordReader , CombineFileSplit => NewCombineFileSplit ,
38+ FileSplit => NewFileSplit , TextInputFormat => NewTextInputFormat }
3039
3140import org .apache .spark .SharedSparkContext
3241import org .apache .spark .deploy .SparkHadoopUtil
@@ -202,7 +211,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
202211 val fs = FileSystem .getLocal(new Configuration ())
203212 val outPath = new Path (fs.getWorkingDirectory, " outdir" )
204213
205- if (SparkHadoopUtil .get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf ).isDefined) {
214+ if (SparkHadoopUtil .get.getFSBytesWrittenOnThreadCallback().isDefined) {
206215 val taskBytesWritten = new ArrayBuffer [Long ]()
207216 sc.addSparkListener(new SparkListener () {
208217 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ) {
@@ -225,4 +234,88 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
225234 }
226235 }
227236 }
237+
238+ test(" input metrics with old CombineFileInputFormat" ) {
239+ val bytesRead = runAndReturnBytesRead {
240+ sc.hadoopFile(tmpFilePath, classOf [OldCombineTextInputFormat ], classOf [LongWritable ],
241+ classOf [Text ], 2 ).count()
242+ }
243+ assert(bytesRead >= tmpFile.length())
244+ }
245+
246+ test(" input metrics with new CombineFileInputFormat" ) {
247+ val bytesRead = runAndReturnBytesRead {
248+ sc.newAPIHadoopFile(tmpFilePath, classOf [NewCombineTextInputFormat ], classOf [LongWritable ],
249+ classOf [Text ], new Configuration ()).count()
250+ }
251+ assert(bytesRead >= tmpFile.length())
252+ }
253+ }
254+
255+ /**
256+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
257+ */
258+ class OldCombineTextInputFormat extends OldCombineFileInputFormat [LongWritable , Text ] {
259+ override def getRecordReader (split : OldInputSplit , conf : JobConf , reporter : Reporter )
260+ : OldRecordReader [LongWritable , Text ] = {
261+ new OldCombineFileRecordReader [LongWritable , Text ](conf,
262+ split.asInstanceOf [OldCombineFileSplit ], reporter, classOf [OldCombineTextRecordReaderWrapper ]
263+ .asInstanceOf [Class [OldRecordReader [LongWritable , Text ]]])
264+ }
265+ }
266+
267+ class OldCombineTextRecordReaderWrapper (
268+ split : OldCombineFileSplit ,
269+ conf : Configuration ,
270+ reporter : Reporter ,
271+ idx : Integer ) extends OldRecordReader [LongWritable , Text ] {
272+
273+ val fileSplit = new OldFileSplit (split.getPath(idx),
274+ split.getOffset(idx),
275+ split.getLength(idx),
276+ split.getLocations())
277+
278+ val delegate : OldLineRecordReader = new OldTextInputFormat ().getRecordReader(fileSplit,
279+ conf.asInstanceOf [JobConf ], reporter).asInstanceOf [OldLineRecordReader ]
280+
281+ override def next (key : LongWritable , value : Text ): Boolean = delegate.next(key, value)
282+ override def createKey (): LongWritable = delegate.createKey()
283+ override def createValue (): Text = delegate.createValue()
284+ override def getPos (): Long = delegate.getPos
285+ override def close (): Unit = delegate.close()
286+ override def getProgress (): Float = delegate.getProgress
287+ }
288+
289+ /**
290+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
291+ */
292+ class NewCombineTextInputFormat extends NewCombineFileInputFormat [LongWritable ,Text ] {
293+ def createRecordReader (split : NewInputSplit , context : TaskAttemptContext )
294+ : NewRecordReader [LongWritable , Text ] = {
295+ new NewCombineFileRecordReader [LongWritable ,Text ](split.asInstanceOf [NewCombineFileSplit ],
296+ context, classOf [NewCombineTextRecordReaderWrapper ])
297+ }
228298}
299+
300+ class NewCombineTextRecordReaderWrapper (
301+ split : NewCombineFileSplit ,
302+ context : TaskAttemptContext ,
303+ idx : Integer ) extends NewRecordReader [LongWritable , Text ] {
304+
305+ val fileSplit = new NewFileSplit (split.getPath(idx),
306+ split.getOffset(idx),
307+ split.getLength(idx),
308+ split.getLocations())
309+
310+ val delegate = new NewTextInputFormat ().createRecordReader(fileSplit, context)
311+
312+ override def initialize (split : NewInputSplit , context : TaskAttemptContext ): Unit = {
313+ delegate.initialize(fileSplit, context)
314+ }
315+
316+ override def nextKeyValue (): Boolean = delegate.nextKeyValue()
317+ override def getCurrentKey (): LongWritable = delegate.getCurrentKey
318+ override def getCurrentValue (): Text = delegate.getCurrentValue
319+ override def getProgress (): Float = delegate.getProgress
320+ override def close (): Unit = delegate.close()
321+ }
0 commit comments