Skip to content

Conversation

@dbtsai
Copy link
Member

@dbtsai dbtsai commented Feb 28, 2020

What changes were proposed in this pull request?

  1. DataSourceStrategy.scala is extended to create org.apache.spark.sql.sources.Filter from nested expressions.
  2. Translation from nested org.apache.spark.sql.sources.Filter to org.apache.parquet.filter2.predicate.FilterPredicate is implemented to support nested predicate pushdown for Parquet.

Why are the changes needed?

Better performance for handling nested predicate pushdown.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New tests are added.

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119057 has finished for PR 27728 at commit 79c2b29.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class NestedFilterApi

@dbtsai
Copy link
Member Author

dbtsai commented Feb 28, 2020

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon this PR also fixes the old problem that we don't allow to pushdown a column name containing "dot".

@dongjoon-hyun
Copy link
Member

Thank you, @dbtsai !

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119061 has finished for PR 27728 at commit 4b27c82.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119059 has finished for PR 27728 at commit 400aebe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119060 has finished for PR 27728 at commit 3a2fb39.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor comments from a quick skim, thanks for working on this :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: would this not be 3.1.0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, seems to be 3.1.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Going to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this simplification 👍

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119102 has finished for PR 27728 at commit 0ae261d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there is a column name containing .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about pass the schema or top-level output attributes as a parameter, and judge if the filter is of a nested column.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quick implementation. I plan to write a parser to parse nested column name.

For example, a.b is a nested field while

`a.b`

is a field name containing dot. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can add a test case with more than 2 level nested columns here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test case for three level nested field.

@dbtsai dbtsai changed the title [SPARK-17636][SQL] Nested Field Predicate Pushdown for Parquet [SPARK-17636][SQL] Nested Column Predicate Pushdown for Parquet Feb 28, 2020

This comment was marked as resolved.

This comment was marked as resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So at each of segment of the path, dot is allowed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But this is a private API in Parquet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the previous comment, we still need to prevent `a.b`.c.d?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. we don't have to. Should work.

@SparkQA
Copy link

SparkQA commented Feb 29, 2020

Test build #119107 has finished for PR 27728 at commit 2da3737.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 29, 2020

Test build #119104 has finished for PR 27728 at commit 6972b54.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 29, 2020

Test build #119106 has finished for PR 27728 at commit a1b0c32.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class NestedFilterApi

@SparkQA
Copy link

SparkQA commented Feb 29, 2020

Test build #119109 has finished for PR 27728 at commit c706fe5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 29, 2020

Test build #119108 has finished for PR 27728 at commit 784d372.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class NestedFilterApi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: should be unquote

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can copy how this is done in LogicalExpressions.

val parser = new CatalystSqlParser(SQLConf.get)
val nameParts = parser.parseMultipartIdentifier(name)

You can also use the parseReference method if you want to get a FieldReference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip. I was planing to implement this myself.

Copy link
Contributor

@rdblue rdblue Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't create a new parser each time it is used. Can you create a private field for the parser?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is there a better place for this? If there is a common need for a parser for names, then maybe the SparkSqlParser object should expose methods to parse names and have an internal instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

  private lazy val catalystSqlParser = new CatalystSqlParser(SQLConf.get)

  def unquote(name: String): Seq[String] = {
    catalystSqlParser.parseMultipartIdentifier(name)
  }

for now until we find a good place to have an internal instance of catalystSqlParser?

The concern is that if we have it in the singleton, what if the SQLConf is changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: other places (i.e. Collection#contains) normally use contains instead of contain.

@rdblue
Copy link
Contributor

rdblue commented Mar 2, 2020

Overall, this looks good to me. Only a couple minor points.

@SparkQA
Copy link

SparkQA commented Mar 26, 2020

Test build #120422 has finished for PR 27728 at commit 5fd97c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in cb0db21 Mar 27, 2020
cloud-fan pushed a commit that referenced this pull request Mar 27, 2020
…2] Nested Column Predicate Pushdown for Parquet

### What changes were proposed in this pull request?
1. `DataSourceStrategy.scala` is extended to create `org.apache.spark.sql.sources.Filter` from nested expressions.
2. Translation from nested `org.apache.spark.sql.sources.Filter` to `org.apache.parquet.filter2.predicate.FilterPredicate` is implemented to support nested predicate pushdown for Parquet.

### Why are the changes needed?
Better performance for handling nested predicate pushdown.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New tests are added.

Closes #27728 from dbtsai/SPARK-17636.

Authored-by: DB Tsai <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit cb0db21)
Signed-off-by: Wenchen Fan <[email protected]>
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 27, 2020

Hi, @cloud-fan .
Could you fix the build failure in branch-3.0? It's a conf version issue which occurred multiple times before.

[ERROR] [Error] /home/runner/work/spark/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:1867:
value version is not a member of org.apache.spark.internal.config.ConfigBuilder

@dongjoon-hyun
Copy link
Member

@HyukjinKwon
Copy link
Member

Thanks @dongjoon-hyun.

@dbtsai
Copy link
Member Author

dbtsai commented Mar 28, 2020

Thank you, @cloud-fan @HyukjinKwon @rdblue @viirya @dongjoon-hyun and @holdenk, etc for reviewing!!!

@dbtsai
Copy link
Member Author

dbtsai commented Mar 28, 2020

Ping @MaxGekk , are you interested in working on ORC version of it as you are recently working on this area a bit?

@gatorsmile
Copy link
Member

@dbtsai Great work! Could you show some perf number ?

@dbtsai
Copy link
Member Author

dbtsai commented Apr 1, 2020

It really depends on the data and if we can skip most of the row group by having a predicate in nested field. For our production jobs, we are able to get 20x speed-up in clock time, and 8x less data being read. I will create a synthetic benchmark for this feature. Thanks,

@gatorsmile
Copy link
Member

Do you know the perf number for the worst case (e.g., no row can be filtered out)?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Apr 1, 2020

It's true that it can depends on the nature of data or workloads of course but I think it makes sense to at least have a benchmark though, including the (roughly) best and worst cases.

@dbtsai
Copy link
Member Author

dbtsai commented Apr 1, 2020

For the worst case, we don't see extra overhead from parquet side. I agree that we should have benchmark suite for best (filtering out everything in a row group) and the worst case that nothing is filtered out in a row group.

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…2] Nested Column Predicate Pushdown for Parquet

### What changes were proposed in this pull request?
1. `DataSourceStrategy.scala` is extended to create `org.apache.spark.sql.sources.Filter` from nested expressions.
2. Translation from nested `org.apache.spark.sql.sources.Filter` to `org.apache.parquet.filter2.predicate.FilterPredicate` is implemented to support nested predicate pushdown for Parquet.

### Why are the changes needed?
Better performance for handling nested predicate pushdown.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New tests are added.

Closes apache#27728 from dbtsai/SPARK-17636.

Authored-by: DB Tsai <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
case _ => None
}
helper(e)
helper(e).map(_.quoted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, for my late review. This sounds an API breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit this change to some specific data sources? For example, parquet only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid it might break the released external third-party connectors that might not be able to handle the quoted column names.

Copy link
Member

@HyukjinKwon HyukjinKwon Apr 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was pointed out and discussed at #27728 (comment) and #27728 (comment).
@HeartSaVioR is working on it at this JIRA SPARK-31365. I turned the JIRA to be a blocker for Spark 3.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I've been working on my own task (prioritized) and am afraid I can't pick up this soon. Please take this over if anyone has the idea of implementing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Are you interested in this follow up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try looking at this this week. If anyone picks it up before me, I'm also ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this 3.0 blocker

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def helper(e: Expression): Option[Seq[String]] = e match {
case a: Attribute =>
if (nestedPredicatePushdownEnabled || !a.name.contains(".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what this condition means?

cloud-fan pushed a commit that referenced this pull request May 6, 2020
### What changes were proposed in this pull request?

This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown.

### Why are the changes needed?

We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources.

In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment).

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added/Modified unit tests.

Closes #28366 from viirya/SPARK-31365.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request May 6, 2020
### What changes were proposed in this pull request?

This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown.

### Why are the changes needed?

We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources.

In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment).

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added/Modified unit tests.

Closes #28366 from viirya/SPARK-31365.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4952f1a)
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jul 1, 2020
…potential conflicts in dev

### What changes were proposed in this pull request?

This PR proposes to partially reverts back in the tests and some codes at #27728 without touching any behaivours.

Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`.

Basically, it addresses the comments #27728 (comment), and my own comment in another PR at #28761 (comment)

### Why are the changes needed?

For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested.

Closes #28955 from HyukjinKwon/SPARK-25556-followup.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jul 1, 2020
…potential conflicts in dev

### What changes were proposed in this pull request?

This PR proposes to partially reverts back in the tests and some codes at #27728 without touching any behaivours.

Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`.

Basically, it addresses the comments #27728 (comment), and my own comment in another PR at #28761 (comment)

### Why are the changes needed?

For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested.

Closes #28955 from HyukjinKwon/SPARK-25556-followup.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 8194d9e)
Signed-off-by: HyukjinKwon <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Aug 7, 2020
### What changes were proposed in this pull request?

We added nested column predicate pushdown for Parquet in #27728. This patch extends the feature support to ORC.

### Why are the changes needed?

Extending the feature to ORC for feature parity. Better performance for handling nested predicate pushdown.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests.

Closes #28761 from viirya/SPARK-25557.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
LuciferYang added a commit that referenced this pull request Aug 2, 2023
… `Filter`

### What changes were proposed in this pull request?
This pr aims remove `private[sql] `function `containsNestedColumn` from `org.apache.spark.sql.sources.Filter`.
This function was introduced by #27728 to avoid nested predicate pushdown for Orc.
After #28761, Orc also support nested column predicate pushdown, so this function become unused.

### Why are the changes needed?
Remove unused `private[sql] ` function `containsNestedColumn`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #42239 from LuciferYang/SPARK-44607.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.