-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20848][SQL] Shutdown the pool after reading parquet files #18073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging { | |
partFiles: Seq[FileStatus], | ||
ignoreCorruptFiles: Boolean): Seq[Footer] = { | ||
val parFiles = partFiles.par | ||
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) | ||
val pool = new ForkJoinPool(8) | ||
parFiles.tasksupport = new ForkJoinTaskSupport(pool) | ||
parFiles.flatMap { currentFile => | ||
try { | ||
// Skips row group information since we only need the schema. | ||
|
@@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging { | |
} else { | ||
throw new IOException(s"Could not read footer for file: $currentFile", e) | ||
} | ||
} finally { | ||
pool.shutdown() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we terminate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not doing it outside? For example? val parFiles = partFiles.par
val pool = new ForkJoinPool(8)
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
try {
parFiles.flatMap { currentFile =>
...
}.seq
} finally {
pool.shutdown()
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect this will fail some test, but it didn't... When you fix this error, could you call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was shutdowning it outside at the beginning of this PR. I changed to current way after @srowen's suggestion. I was thinking it can be wrong initially. But seems it is fine and I think the tasks are all invoked at the beginning and no more tasks are submitted later, so to shutdown inside is ok. I can go to submit a follow-up if you still think we need to change it. Thank you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't check the details. But I guess the implementation will submit tasks one by one. Then it's possible that when the first task is shutting down the pool, some tasks has not yet been submitted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. We should take a safer approach. Let me submit a follow-up for this. Thanks @zsxwing. |
||
} | ||
}.seq | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it be better to share one global thread pool? Creating a lot of thread pools may not increase the concurrency
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main concern is that if we share a thread pool for parquet reading, we may limit the concurrency as @srowen pointed out in the JIRA.
If we have multiple parquet reading in parallel, they will share one pool. Currently they own their pools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if using a shared one will change current behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's keep the previous behavior