Skip to content

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Nov 19, 2020

What changes were proposed in this pull request?

This adds changes in the following places:

  • logic to also refresh caches referencing the target table in v2 AppendDataExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, as well as their v1 fallbacks AppendDataExecV1 and OverwriteByExpressionExecV1.
  • logic to invalidate caches referencing the target table in v2 ReplaceTableAsSelectExec and its atomic version AtomicReplaceTableAsSelectExec. These are only supported in v2 at the moment though.

In addition to the above, in order to test the v1 write fallback behavior, I extended InMemoryTableWithV1Fallback to also support batch reads.

Why are the changes needed?

Currently in DataSource v2 we don't refresh or invalidate caches referencing the target table when the table content is changed by operations such as append, overwrite, or replace table. This is different from DataSource v1, and could potentially cause data correctness issue if the staled caches are queried later.

Does this PR introduce any user-facing change?

Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.

How was this patch tested?

Added unit tests for the new code path.

@github-actions github-actions bot added the SQL label Nov 19, 2020
@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35965/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35965/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131361 has finished for PR 30429 at commit ef1a45b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.
Could you review this, @rdblue and @cloud-fan ?

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36037/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36037/

@sunchao
Copy link
Member Author

sunchao commented Nov 20, 2020

The failing test in github action is unrelated - I should have rebased to the latest master.

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131431 has finished for PR 30429 at commit ef1a45b.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

With an integrated test result,

  • SparkR test passed in GitHub Action although it failed at Jenkins.
  • Scala 2.13 failure is irrelevant to this one. It's already fixed in the master by @sunchao
  • K8s IT passed.

I'll merge this. Thanks, @sunchao .

@rdblue
Copy link
Contributor

rdblue commented Nov 24, 2020

Sorry I'm late to help review, but this overall looks good to me.

The one minor issue I have is passing the Spark session and logical plan through the plans as new parameters. I would rather use a callback to refresh instead, so that DataSourceV2Strategy would pass what to do after the commit:

    case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
      val refresh = () => session.sharedState.cacheManager.recacheByPlan(session, r)
      r.table.asWritable match {
        case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
          AppendDataExecV1(v1, writeOptions.asOptions, query, afterWrite = refresh) :: Nil
        case v2 =>
          AppendDataExec(v2, r, writeOptions.asOptions, planLater(query), afterWrite = refresh) :: Nil
      }

@cloud-fan
Copy link
Contributor

+1, it makes the query plan more general.

@sunchao
Copy link
Member Author

sunchao commented Nov 24, 2020

Thanks @rdblue and @cloud-fan ! yes this seems like a nice improvement. I'll address your comment in a follow-up PR.

@dongjoon-hyun
Copy link
Member

Thank you, @rdblue , @cloud-fan , @sunchao .

@sunchao sunchao deleted the SPARK-33492 branch November 24, 2020 22:42
cloud-fan pushed a commit that referenced this pull request Nov 30, 2020
…n and v2 relation for refreshing cache

### What changes were proposed in this pull request?

This replaces Spark session and `DataSourceV2Relation` in V2 write plans by replacing them with a callback `afterWrite`.

### Why are the changes needed?

Per discussion in #30429, it's better to not pass Spark session and `DataSourceV2Relation` through Spark plans. Instead we can use a callback which makes the interface cleaner.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #30491 from sunchao/SPARK-33492-followup.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 8, 2021
### What changes were proposed in this pull request?

This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec.

### Why are the changes needed?

Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results.

### Does this PR introduce _any_ user-facing change?

Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.

### How was this patch tested?

Added a new unit test.

Closes #31081 from sunchao/SPARK-34039.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sunchao added a commit to sunchao/spark that referenced this pull request Jan 9, 2021
This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec.

Similar to SPARK-33492 (apache#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results.

Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.

Added a new unit test.

Closes apache#31081 from sunchao/SPARK-34039.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Jan 10, 2021
### What changes were proposed in this pull request?

This is a backport of #31081 to branch-3.1.

This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec.

### Why are the changes needed?

Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results.

### Does this PR introduce _any_ user-facing change?

Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.

### How was this patch tested?

Added a new unit test.

Closes #31100 from sunchao/SPARK-34039-branch-3.1.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants