Skip to content

Commit 8a54582

Browse files
committed
[SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist
## What changes were proposed in this pull request? This PR proposes to follow up apache#15153 and complete SPARK-17599. `FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below: ``` Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206) at com.hwx.StreamTest$.main(StreamTest.scala:97) at com.hwx.StreamTest.main(StreamTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... ``` So, it fixes it to make a warning instead. ## How was this patch tested? It's hard to write a test. Manually tested multiple times. Author: hyukjinkwon <[email protected]> Closes apache#21408 from HyukjinKwon/missing-files.
1 parent e108f84 commit 8a54582

File tree

1 file changed

+24
-8
lines changed

1 file changed

+24
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,12 @@ object InMemoryFileIndex extends Logging {
294294
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
295295
}
296296

297-
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
297+
val missingFiles = mutable.ArrayBuffer.empty[String]
298+
val filteredLeafStatuses = allLeafStatuses.filterNot(
299+
status => shouldFilterOut(status.getPath.getName))
300+
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
298301
case f: LocatedFileStatus =>
299-
f
302+
Some(f)
300303

301304
// NOTE:
302305
//
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
311314
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
312315
// which is very slow on some file system (RawLocalFileSystem, which is launch a
313316
// subprocess and parse the stdout).
314-
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
315-
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
316-
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
317-
if (f.isSymlink) {
318-
lfs.setSymlink(f.getSymlink)
317+
try {
318+
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
319+
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
320+
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
321+
if (f.isSymlink) {
322+
lfs.setSymlink(f.getSymlink)
323+
}
324+
Some(lfs)
325+
} catch {
326+
case _: FileNotFoundException =>
327+
missingFiles += f.getPath.toString
328+
None
319329
}
320-
lfs
321330
}
331+
332+
if (missingFiles.nonEmpty) {
333+
logWarning(
334+
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
335+
}
336+
337+
resolvedLeafStatuses
322338
}
323339

324340
/** Checks if we should filter out this path name. */

0 commit comments

Comments
 (0)