-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-33492][SQL] DSv2: Append/Overwrite/ReplaceTable should invalidate cache #30429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #131361 has finished for PR 30429 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Could you review this, @rdblue and @cloud-fan ?
Retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status success |
The failing test in github action is unrelated - I should have rebased to the latest master. |
Test build #131431 has finished for PR 30429 at commit
|
Sorry I'm late to help review, but this overall looks good to me. The one minor issue I have is passing the Spark session and logical plan through the plans as new parameters. I would rather use a callback to refresh instead, so that case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
val refresh = () => session.sharedState.cacheManager.recacheByPlan(session, r)
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query, afterWrite = refresh) :: Nil
case v2 =>
AppendDataExec(v2, r, writeOptions.asOptions, planLater(query), afterWrite = refresh) :: Nil
} |
+1, it makes the query plan more general. |
Thanks @rdblue and @cloud-fan ! yes this seems like a nice improvement. I'll address your comment in a follow-up PR. |
Thank you, @rdblue , @cloud-fan , @sunchao . |
…n and v2 relation for refreshing cache ### What changes were proposed in this pull request? This replaces Spark session and `DataSourceV2Relation` in V2 write plans by replacing them with a callback `afterWrite`. ### Why are the changes needed? Per discussion in #30429, it's better to not pass Spark session and `DataSourceV2Relation` through Spark plans. Instead we can use a callback which makes the interface cleaner. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #30491 from sunchao/SPARK-33492-followup. Authored-by: Chao Sun <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec. ### Why are the changes needed? Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results. ### Does this PR introduce _any_ user-facing change? Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced. ### How was this patch tested? Added a new unit test. Closes #31081 from sunchao/SPARK-34039. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec. Similar to SPARK-33492 (apache#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results. Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced. Added a new unit test. Closes apache#31081 from sunchao/SPARK-34039. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This is a backport of #31081 to branch-3.1. This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec. ### Why are the changes needed? Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results. ### Does this PR introduce _any_ user-facing change? Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced. ### How was this patch tested? Added a new unit test. Closes #31100 from sunchao/SPARK-34039-branch-3.1. Authored-by: Chao Sun <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
This adds changes in the following places:
AppendDataExec
,OverwriteByExpressionExec
,OverwritePartitionsDynamicExec
, as well as their v1 fallbacksAppendDataExecV1
andOverwriteByExpressionExecV1
.ReplaceTableAsSelectExec
and its atomic versionAtomicReplaceTableAsSelectExec
. These are only supported in v2 at the moment though.In addition to the above, in order to test the v1 write fallback behavior, I extended
InMemoryTableWithV1Fallback
to also support batch reads.Why are the changes needed?
Currently in DataSource v2 we don't refresh or invalidate caches referencing the target table when the table content is changed by operations such as append, overwrite, or replace table. This is different from DataSource v1, and could potentially cause data correctness issue if the staled caches are queried later.
Does this PR introduce any user-facing change?
Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.
How was this patch tested?
Added unit tests for the new code path.