Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 294163e

Browse files
ericlcloud-fan
authored andcommitted
[SPARK-18679][SQL] Fix regression in file listing performance for non-catalog tables
## What changes were proposed in this pull request? In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory. This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors). cc mallman cloud-fan ## How was this patch tested? Checked metrics in unit tests. Author: Eric Liang <[email protected]> Closes apache#16112 from ericl/spark-18679.
1 parent 2159bf8 commit 294163e

File tree

3 files changed

+106
-34
lines changed

3 files changed

+106
-34
lines changed

core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
9090
*/
9191
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
9292

93+
/**
94+
* Tracks the total number of Spark jobs launched for parallel file listing.
95+
*/
96+
val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
97+
MetricRegistry.name("parallelListingJobCount"))
98+
9399
/**
94100
* Resets the values of all metrics to zero. This is useful in tests.
95101
*/
@@ -98,11 +104,13 @@ object HiveCatalogMetrics extends Source {
98104
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
99105
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
100106
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
107+
METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
101108
}
102109

103110
// clients can use these to avoid classloader issues with the codahale classes
104111
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
105112
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
106113
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
107114
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
115+
def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
108116
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
249249
pathsToFetch += path
250250
}
251251
}
252-
val discovered = if (pathsToFetch.length >=
253-
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
254-
PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
255-
} else {
256-
PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
257-
}
252+
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
253+
val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
254+
pathsToFetch, hadoopConf, filter, sparkSession)
258255
discovered.foreach { case (path, leafFiles) =>
259256
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
260257
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
286283
blockLocations: Array[SerializableBlockLocation])
287284

288285
/**
289-
* List a collection of path recursively.
290-
*/
291-
private def listLeafFilesInSerial(
292-
paths: Seq[Path],
293-
hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
294-
// Dummy jobconf to get to the pathFilter defined in configuration
295-
val jobConf = new JobConf(hadoopConf, this.getClass)
296-
val filter = FileInputFormat.getInputPathFilter(jobConf)
297-
298-
paths.map { path =>
299-
val fs = path.getFileSystem(hadoopConf)
300-
(path, listLeafFiles0(fs, path, filter))
301-
}
302-
}
303-
304-
/**
305-
* List a collection of path recursively in parallel (using Spark executors).
306-
* Each task launched will use [[listLeafFilesInSerial]] to list.
286+
* Lists a collection of paths recursively. Picks the listing strategy adaptively depending
287+
* on the number of paths to list.
288+
*
289+
* This may only be called on the driver.
290+
*
291+
* @return for each input path, the set of discovered files for the path
307292
*/
308-
private def listLeafFilesInParallel(
293+
private def bulkListLeafFiles(
309294
paths: Seq[Path],
310295
hadoopConf: Configuration,
296+
filter: PathFilter,
311297
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
312-
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
298+
299+
// Short-circuits parallel listing when serial listing is likely to be faster.
300+
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
301+
return paths.map { path =>
302+
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
303+
}
304+
}
305+
313306
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
307+
HiveCatalogMetrics.incrementParallelListingJobCount(1)
314308

315309
val sparkContext = sparkSession.sparkContext
316310
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {
324318

325319
val statusMap = sparkContext
326320
.parallelize(serializedPaths, numParallelism)
327-
.mapPartitions { paths =>
321+
.mapPartitions { pathStrings =>
328322
val hadoopConf = serializableConfiguration.value
329-
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
323+
pathStrings.map(new Path(_)).toSeq.map { path =>
324+
(path, listLeafFiles(path, hadoopConf, filter, None))
325+
}.iterator
330326
}.map { case (path, statuses) =>
331327
val serializableStatuses = statuses.map { status =>
332328
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
374370
}
375371

376372
/**
377-
* List a single path, provided as a FileStatus, in serial.
373+
* Lists a single filesystem path recursively. If a SparkSession object is specified, this
374+
* function may launch Spark jobs to parallelize listing.
375+
*
376+
* If sessionOpt is None, this may be called on executors.
377+
*
378+
* @return all children of path that match the specified filter.
378379
*/
379-
private def listLeafFiles0(
380-
fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
380+
private def listLeafFiles(
381+
path: Path,
382+
hadoopConf: Configuration,
383+
filter: PathFilter,
384+
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
381385
logTrace(s"Listing $path")
386+
val fs = path.getFileSystem(hadoopConf)
382387
val name = path.getName.toLowerCase
383388
if (shouldFilterOut(name)) {
384389
Seq.empty[FileStatus]
@@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
393398
}
394399

395400
val allLeafStatuses = {
396-
val (dirs, files) = statuses.partition(_.isDirectory)
397-
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
398-
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
401+
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
402+
val nestedFiles: Seq[FileStatus] = sessionOpt match {
403+
case Some(session) =>
404+
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
405+
case _ =>
406+
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
407+
}
408+
val allFiles = topLevelFiles ++ nestedFiles
409+
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
399410
}
400411

401412
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
2525

2626
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
2727

28+
import org.apache.spark.metrics.source.HiveCatalogMetrics
2829
import org.apache.spark.sql.catalyst.util._
2930
import org.apache.spark.sql.test.SharedSQLContext
3031

@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
8182
}
8283
}
8384

85+
test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
86+
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
87+
withTempDir { dir =>
88+
val topLevelDirs = (1 to scale).map { i =>
89+
val tmp = new File(dir, s"foo=$i.txt")
90+
tmp.mkdir()
91+
new Path(tmp.getCanonicalPath)
92+
}
93+
HiveCatalogMetrics.reset()
94+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
95+
new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
96+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
97+
}
98+
}
99+
}
100+
101+
test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
102+
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
103+
withTempDir { dir =>
104+
for (i <- 1 to scale) {
105+
new File(dir, s"foo=$i.txt").mkdir()
106+
}
107+
HiveCatalogMetrics.reset()
108+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
109+
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
110+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
111+
}
112+
}
113+
}
114+
115+
test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
116+
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
117+
withTempDir { dir =>
118+
for (i <- 1 to 2) {
119+
val subdirA = new File(dir, s"a=$i")
120+
subdirA.mkdir()
121+
for (j <- 1 to 2) {
122+
val subdirB = new File(subdirA, s"b=$j")
123+
subdirB.mkdir()
124+
for (k <- 1 to scale) {
125+
new File(subdirB, s"foo=$k.txt").mkdir()
126+
}
127+
}
128+
}
129+
HiveCatalogMetrics.reset()
130+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
131+
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
132+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
133+
}
134+
}
135+
}
136+
84137
test("PartitioningAwareFileIndex - file filtering") {
85138
assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
86139
assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))

0 commit comments

Comments
 (0)