Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 3, 2018

What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

ParquetFilters is used in the file scan function, which is executed in executor side, so we can't call conf.parquetFilterPushDownDate there.

How was this patch tested?

it's tested in #21190

@cloud-fan
Copy link
Contributor Author

cc @gatorsmile

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90096 has finished for PR 21224 at commit c58baad.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90110 has finished for PR 21224 at commit d7dc8a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@cloud-fan, the change seems fine but would there be any clever trick to test this? Seems we could very likely do the similar thing by mistake.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed the PR description. So, this will be tested with TaskContext and thread local. Sure, we can talk about it there.

LGTM too.

sparkSession.sessionState.conf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass pushed instead of declaring new pushDownDate?
The following can be handled at line 345 here, not inside (file: PartitionedFile) => {}

       // Try to push down filters when filter push-down is enabled.
       val pushed = if (enableParquetFilterPushDown) {
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
           // is used here.
          .flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
          .reduceOption(FilterApi.and)
       } else {
         None
       }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we can't, see #21086

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thank you, @cloud-fan !

@cloud-fan
Copy link
Contributor Author

thanks, merging to master!

@HyukjinKwon
Copy link
Member

and branch 2-3 too ..?

@asfgit asfgit closed this in 0c23e25 May 4, 2018
@cloud-fan
Copy link
Contributor Author

I realized #21086 is only in master, so this bug doesn't exist in 2.3

zzcclp pushed a commit to zzcclp/spark that referenced this pull request May 11, 2018
… on the driver

## What changes were proposed in this pull request?

This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work.

This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check:
* `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged
* `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods.
* `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged
* `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged
* `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged

## How was this patch tested?

existing test

Author: Wenchen Fan <[email protected]>

Closes apache#21190 from cloud-fan/minor.
robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 24, 2018
… on the driver

## What changes were proposed in this pull request?

This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work.

This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check:
* `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged
* `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods.
* `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged
* `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged
* `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged

## How was this patch tested?

existing test

Author: Wenchen Fan <[email protected]>

Closes apache#21190 from cloud-fan/minor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants