Skip to content

Commit 4bf5cec

Browse files
committed
filter empty files on constructing selectedPartitions
1 parent 664fe9b commit 4bf5cec

File tree

3 files changed

+8
-5
lines changed

3 files changed

+8
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class FileSourceScanExec(
382382
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
383383
val filesGroupedToBuckets =
384384
selectedPartitions.flatMap { p =>
385-
p.files.filter(_.getLen > 0).map { f =>
385+
p.files.map { f =>
386386
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
387387
}
388388
}.groupBy { f =>
@@ -426,7 +426,7 @@ case class FileSourceScanExec(
426426
s"open cost is considered as scanning $openCostInBytes bytes.")
427427

428428
val splitFiles = selectedPartitions.flatMap { partition =>
429-
partition.files.filter(_.getLen > 0).flatMap { file =>
429+
partition.files.flatMap { file =>
430430
// getPath() is very expensive so we only want to call it once in this block:
431431
val filePath = file.getPath
432432
val isSplitable = relation.fileFormat.isSplitable(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex(
5858

5959
override def listFiles(
6060
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
61+
def isNonEmptyFile(f: FileStatus): Boolean = {
62+
isDataPath(f.getPath) && f.getLen > 0
63+
}
6164
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
62-
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
65+
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
6366
} else {
6467
prunePartitions(partitionFilters, partitionSpec()).map {
6568
case PartitionPath(values, path) =>
6669
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
6770
case Some(existingDir) =>
6871
// Directory has children files in it, return them
69-
existingDir.filter(f => isDataPath(f.getPath))
72+
existingDir.filter(isNonEmptyFile)
7073

7174
case None =>
7275
// Directory does not exist, or has no children files

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ abstract class FileScan(
4141
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
4242
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
4343
val splitFiles = selectedPartitions.flatMap { partition =>
44-
partition.files.filter(_.getLen > 0).flatMap { file =>
44+
partition.files.flatMap { file =>
4545
val filePath = file.getPath
4646
PartitionedFileUtil.splitFiles(
4747
sparkSession = sparkSession,

0 commit comments

Comments
 (0)