-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25556][SPARK-17636][SPARK-31026][SPARK-31060][SQL][test-hive1.2] Nested Column Predicate Pushdown for Parquet #27728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #119057 has finished for PR 27728 at commit
|
sql/core/src/main/java/org/apache/parquet/filter2/predicate/NestedFilterApi.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon this PR also fixes the old problem that we don't allow to pushdown a column name containing "dot".
|
Thank you, @dbtsai ! |
|
Test build #119061 has finished for PR 27728 at commit
|
|
Test build #119059 has finished for PR 27728 at commit
|
|
Test build #119060 has finished for PR 27728 at commit
|
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minor comments from a quick skim, thanks for working on this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: would this not be 3.1.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, seems to be 3.1.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Going to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this simplification 👍
|
Test build #119102 has finished for PR 27728 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there is a column name containing .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about pass the schema or top-level output attributes as a parameter, and judge if the filter is of a nested column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a quick implementation. I plan to write a parser to parse nested column name.
For example, a.b is a nested field while
`a.b`
is a field name containing dot. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we can add a test case with more than 2 level nested columns here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test case for three level nested field.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So at each of segment of the path, dot is allowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But this is a private API in Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the previous comment, we still need to prevent `a.b`.c.d?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. we don't have to. Should work.
|
Test build #119107 has finished for PR 27728 at commit
|
|
Test build #119104 has finished for PR 27728 at commit
|
|
Test build #119106 has finished for PR 27728 at commit
|
|
Test build #119109 has finished for PR 27728 at commit
|
|
Test build #119108 has finished for PR 27728 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: should be unquote
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can copy how this is done in LogicalExpressions.
val parser = new CatalystSqlParser(SQLConf.get)
val nameParts = parser.parseMultipartIdentifier(name)You can also use the parseReference method if you want to get a FieldReference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip. I was planing to implement this myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't create a new parser each time it is used. Can you create a private field for the parser?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is there a better place for this? If there is a common need for a parser for names, then maybe the SparkSqlParser object should expose methods to parse names and have an internal instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
private lazy val catalystSqlParser = new CatalystSqlParser(SQLConf.get)
def unquote(name: String): Seq[String] = {
catalystSqlParser.parseMultipartIdentifier(name)
}for now until we find a good place to have an internal instance of catalystSqlParser?
The concern is that if we have it in the singleton, what if the SQLConf is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: other places (i.e. Collection#contains) normally use contains instead of contain.
|
Overall, this looks good to me. Only a couple minor points. |
|
Test build #120422 has finished for PR 27728 at commit
|
|
thanks, merging to master/3.0! |
…2] Nested Column Predicate Pushdown for Parquet ### What changes were proposed in this pull request? 1. `DataSourceStrategy.scala` is extended to create `org.apache.spark.sql.sources.Filter` from nested expressions. 2. Translation from nested `org.apache.spark.sql.sources.Filter` to `org.apache.parquet.filter2.predicate.FilterPredicate` is implemented to support nested predicate pushdown for Parquet. ### Why are the changes needed? Better performance for handling nested predicate pushdown. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New tests are added. Closes #27728 from dbtsai/SPARK-17636. Authored-by: DB Tsai <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit cb0db21) Signed-off-by: Wenchen Fan <[email protected]>
|
Hi, @cloud-fan . |
|
Thanks @dongjoon-hyun. |
|
Thank you, @cloud-fan @HyukjinKwon @rdblue @viirya @dongjoon-hyun and @holdenk, etc for reviewing!!! |
|
Ping @MaxGekk , are you interested in working on ORC version of it as you are recently working on this area a bit? |
|
@dbtsai Great work! Could you show some perf number ? |
|
It really depends on the data and if we can skip most of the row group by having a predicate in nested field. For our production jobs, we are able to get 20x speed-up in clock time, and 8x less data being read. I will create a synthetic benchmark for this feature. Thanks, |
|
Do you know the perf number for the worst case (e.g., no row can be filtered out)? |
|
It's true that it can depends on the nature of data or workloads of course but I think it makes sense to at least have a benchmark though, including the (roughly) best and worst cases. |
|
For the worst case, we don't see extra overhead from parquet side. I agree that we should have benchmark suite for best (filtering out everything in a row group) and the worst case that nothing is filtered out in a row group. |
…2] Nested Column Predicate Pushdown for Parquet ### What changes were proposed in this pull request? 1. `DataSourceStrategy.scala` is extended to create `org.apache.spark.sql.sources.Filter` from nested expressions. 2. Translation from nested `org.apache.spark.sql.sources.Filter` to `org.apache.parquet.filter2.predicate.FilterPredicate` is implemented to support nested predicate pushdown for Parquet. ### Why are the changes needed? Better performance for handling nested predicate pushdown. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New tests are added. Closes apache#27728 from dbtsai/SPARK-17636. Authored-by: DB Tsai <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
| case _ => None | ||
| } | ||
| helper(e) | ||
| helper(e).map(_.quoted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, for my late review. This sounds an API breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we limit this change to some specific data sources? For example, parquet only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid it might break the released external third-party connectors that might not be able to handle the quoted column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was pointed out and discussed at #27728 (comment) and #27728 (comment).
@HeartSaVioR is working on it at this JIRA SPARK-31365. I turned the JIRA to be a blocker for Spark 3.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry I've been working on my own task (prioritized) and am afraid I can't pick up this soon. Please take this over if anyone has the idea of implementing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Are you interested in this follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try looking at this this week. If anyone picks it up before me, I'm also ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for fixing this 3.0 blocker
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper | ||
| def helper(e: Expression): Option[Seq[String]] = e match { | ||
| case a: Attribute => | ||
| if (nestedPredicatePushdownEnabled || !a.name.contains(".")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what this condition means?
### What changes were proposed in this pull request? This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown. ### Why are the changes needed? We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources. In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment). ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added/Modified unit tests. Closes #28366 from viirya/SPARK-31365. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown. ### Why are the changes needed? We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources. In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment). ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added/Modified unit tests. Closes #28366 from viirya/SPARK-31365. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4952f1a) Signed-off-by: Wenchen Fan <[email protected]>
…potential conflicts in dev ### What changes were proposed in this pull request? This PR proposes to partially reverts back in the tests and some codes at #27728 without touching any behaivours. Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`. Basically, it addresses the comments #27728 (comment), and my own comment in another PR at #28761 (comment) ### Why are the changes needed? For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. Closes #28955 from HyukjinKwon/SPARK-25556-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…potential conflicts in dev ### What changes were proposed in this pull request? This PR proposes to partially reverts back in the tests and some codes at #27728 without touching any behaivours. Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`. Basically, it addresses the comments #27728 (comment), and my own comment in another PR at #28761 (comment) ### Why are the changes needed? For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. Closes #28955 from HyukjinKwon/SPARK-25556-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit 8194d9e) Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? We added nested column predicate pushdown for Parquet in #27728. This patch extends the feature support to ORC. ### Why are the changes needed? Extending the feature to ORC for feature parity. Better performance for handling nested predicate pushdown. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #28761 from viirya/SPARK-25557. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… `Filter` ### What changes were proposed in this pull request? This pr aims remove `private[sql] `function `containsNestedColumn` from `org.apache.spark.sql.sources.Filter`. This function was introduced by #27728 to avoid nested predicate pushdown for Orc. After #28761, Orc also support nested column predicate pushdown, so this function become unused. ### Why are the changes needed? Remove unused `private[sql] ` function `containsNestedColumn`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #42239 from LuciferYang/SPARK-44607. Authored-by: yangjie01 <[email protected]> Signed-off-by: yangjie01 <[email protected]>
What changes were proposed in this pull request?
DataSourceStrategy.scalais extended to createorg.apache.spark.sql.sources.Filterfrom nested expressions.org.apache.spark.sql.sources.Filtertoorg.apache.parquet.filter2.predicate.FilterPredicateis implemented to support nested predicate pushdown for Parquet.Why are the changes needed?
Better performance for handling nested predicate pushdown.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests are added.