Skip to content

Commit 49b0411

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files
## What changes were proposed in this pull request? In #23130, all empty files are excluded from target file splits in `FileSourceScanExec`. In File source V2, we should keep the same behavior. This PR suggests to filter out empty files on listing files in `PartitioningAwareFileIndex` so that the upper level doesn't need to handle them. ## How was this patch tested? Unit test Closes #24227 from gengliangwang/ignoreEmptyFile. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent f1fe805 commit 49b0411

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class FileSourceScanExec(
382382
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
383383
val filesGroupedToBuckets =
384384
selectedPartitions.flatMap { p =>
385-
p.files.filter(_.getLen > 0).map { f =>
385+
p.files.map { f =>
386386
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
387387
}
388388
}.groupBy { f =>
@@ -426,7 +426,7 @@ case class FileSourceScanExec(
426426
s"open cost is considered as scanning $openCostInBytes bytes.")
427427

428428
val splitFiles = selectedPartitions.flatMap { partition =>
429-
partition.files.filter(_.getLen > 0).flatMap { file =>
429+
partition.files.flatMap { file =>
430430
// getPath() is very expensive so we only want to call it once in this block:
431431
val filePath = file.getPath
432432
val isSplitable = relation.fileFormat.isSplitable(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex(
5858

5959
override def listFiles(
6060
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
61+
def isNonEmptyFile(f: FileStatus): Boolean = {
62+
isDataPath(f.getPath) && f.getLen > 0
63+
}
6164
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
62-
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
65+
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
6366
} else {
6467
prunePartitions(partitionFilters, partitionSpec()).map {
6568
case PartitionPath(values, path) =>
6669
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
6770
case Some(existingDir) =>
6871
// Directory has children files in it, return them
69-
existingDir.filter(f => isDataPath(f.getPath))
72+
existingDir.filter(isNonEmptyFile)
7073

7174
case None =>
7275
// Directory does not exist, or has no children files

sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
146146
}
147147

148148
test("skip empty files in non bucketed read") {
149-
withTempDir { dir =>
150-
val path = dir.getCanonicalPath
151-
Files.write(Paths.get(path, "empty"), Array.empty[Byte])
152-
Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
153-
val readback = spark.read.option("wholetext", true).text(path)
154-
155-
assert(readback.rdd.getNumPartitions === 1)
149+
Seq("csv", "text").foreach { format =>
150+
withTempDir { dir =>
151+
val path = dir.getCanonicalPath
152+
Files.write(Paths.get(path, "empty"), Array.empty[Byte])
153+
Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
154+
val readBack = spark.read.option("wholetext", true).format(format).load(path)
155+
156+
assert(readBack.rdd.getNumPartitions === 1)
157+
}
156158
}
157159
}
158160
}

0 commit comments

Comments
 (0)