From e4940b9e511a314144db042dfaa6b0f2f4a4a45d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 May 2017 12:54:21 +0000 Subject: [PATCH 1/3] Shutdown the pool after reading parquet files. --- .../datasources/parquet/ParquetFileFormat.scala | 7 +++++-- .../parquet/ParquetFileFormatSuite.scala | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2f3a2c62b912c..cd43555e59686 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -479,8 +479,9 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par - parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) - parFiles.flatMap { currentFile => + val readParquetTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + parFiles.tasksupport = readParquetTaskSupport + val footers = parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, @@ -497,6 +498,8 @@ object ParquetFileFormat extends Logging { } } }.seq + readParquetTaskSupport.forkJoinPool.shutdown() + footers } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index ccb34355f1bac..036b79b583d8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -26,6 +26,22 @@ import org.apache.spark.sql.test.SharedSQLContext class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext { + test("Number of threads doesn't grow extremely after parquet file reading") { + withTempDir { dir => + val file = dir.toString + "/file" + spark.range(1).toDF("a").coalesce(1).write.parquet(file) + spark.read.parquet(file) + val numThreadBefore = Thread.activeCount + (1 to 100).map { _ => + spark.read.parquet(file) + } + val numThreadAfter = Thread.activeCount + // Hard to test a correct thread number, + // but it shouldn't increase more than a reasonable number. + assert(numThreadAfter - numThreadBefore < 20) + } + } + test("read parquet footers in parallel") { def testReadFooters(ignoreCorruptFiles: Boolean): Unit = { withTempDir { dir => From 14e09aafb8ee0cc9f5f1e95bc41b19eeab6fc7d4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 May 2017 02:33:18 +0000 Subject: [PATCH 2/3] Address comment. --- .../datasources/parquet/ParquetFileFormat.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index cd43555e59686..29ed8906137cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -479,9 +479,9 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par - val readParquetTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) - parFiles.tasksupport = readParquetTaskSupport - val footers = parFiles.flatMap { currentFile => + val pool = new ForkJoinPool(8) + parFiles.tasksupport = new ForkJoinTaskSupport(pool) + parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, @@ -496,10 +496,10 @@ object ParquetFileFormat extends Logging { } else { throw new IOException(s"Could not read footer for file: $currentFile", e) } + } finally { + pool.shutdown() } }.seq - readParquetTaskSupport.forkJoinPool.shutdown() - footers } /** From 7e57595d2750b65002cda6a5ee94dbcf3f5f2f64 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 May 2017 08:49:53 +0000 Subject: [PATCH 3/3] Remove hacky test. --- .../parquet/ParquetFileFormatSuite.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index 036b79b583d8f..ccb34355f1bac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -26,22 +26,6 @@ import org.apache.spark.sql.test.SharedSQLContext class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext { - test("Number of threads doesn't grow extremely after parquet file reading") { - withTempDir { dir => - val file = dir.toString + "/file" - spark.range(1).toDF("a").coalesce(1).write.parquet(file) - spark.read.parquet(file) - val numThreadBefore = Thread.activeCount - (1 to 100).map { _ => - spark.read.parquet(file) - } - val numThreadAfter = Thread.activeCount - // Hard to test a correct thread number, - // but it shouldn't increase more than a reasonable number. - assert(numThreadAfter - numThreadBefore < 20) - } - } - test("read parquet footers in parallel") { def testReadFooters(ignoreCorruptFiles: Boolean): Unit = { withTempDir { dir =>