diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f37c95bedc0a5..463dd5bd30739 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -139,6 +139,9 @@ class HadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -245,8 +248,7 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case eof: EOFException => - finished = true + case _: EOFException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 46fe1ba93441b..5b5ddd5ce28f2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import java.io.EOFException import java.text.SimpleDateFormat import java.util.Date @@ -84,6 +85,9 @@ class NewHadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -171,7 +175,11 @@ class NewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = !reader.nextKeyValue + try { + finished = !reader.nextKeyValue + } catch { + case _: EOFException if ignoreCorruptFiles => finished = true + } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index fdb00aafc4a48..7e87092d61671 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark -import java.io.{File, FileWriter} +import java.io._ +import java.util.zip.GZIPOutputStream import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.PortableDataStream @@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { }.collect() assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) } + + test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + + // Spark job should ignore corrupt files by default + sc = new SparkContext("local", "test") + // Test HadoopRDD + assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty) + // Test NewHadoopRDD + assert { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect().isEmpty + } + sc.stop() + + // Reading a corrupt gzip file should throw EOFException + val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false") + sc = new SparkContext("local", "test", conf) + // Test HadoopRDD + var e = intercept[SparkException] { + sc.textFile(inputFile.toURI.toString).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + // Test NewHadoopRDD + e = intercept[SparkException] { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + } finally { + inputFile.delete() + } + } + }