-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
Conversation
…-liu/spark into skipSnapshotAtBatch
Is there necessity to add an end-to-end test for the options? If so, I can create another PR. The way to construct it is probably by sleeping for a sufficiently long time for maintenance task to run. @anishshri-db @HeartSaVioR |
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError( | ||
snapshotFile(startVersion), toString(), null) | ||
} | ||
synchronized { putStateIntoStateCacheMap(startVersion, startVersionMap.get) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to refactor this with existing loadMap fcn? or add helper function for shared logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For HDFS, it is hard because the common part is really small. But for RocksDB, there is room for refactoring. For example, this is PR is to test whether we can extract a common part of both load
function. #46927
* @param endVersion checkpoint version to end with | ||
*/ | ||
def getStore(startVersion: Long, endVersion: Long): StateStore = | ||
throw new SparkUnsupportedOperationException("getStore with startVersion and endVersion " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just put nothing here? like
def getStore(version: Long): StateStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we cannot, because to make this method optional, it has to have a default implementation, otherwise a build error will be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - what error do you see here ? can you paste it please ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building on the assumption that when users create custom state store provider and they do not implement this method because it is optional. They will see errors like
Missing implementation for member of trait StateStoreProvider
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil | ||
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog} | ||
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStore} | ||
import org.apache.spark.sql.execution.streaming.state._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this because these three are everything in that pkg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The reason is I use three new classes in this pkg. I think it would be too long to include them all. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this should be good
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Outdated
Show resolved
Hide resolved
@WweiL |
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
…-liu/spark into skipSnapshotAtBatch
@@ -796,4 +973,141 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass | |||
testForSide("right") | |||
} | |||
} | |||
|
|||
protected def testSnapshotNotFound(): Unit = { | |||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: according to Databricks scala style, this should be withTempDir { tempDir =>
, could save one indentation (curly brace)
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot] | ||
.replayReadStateFromSnapshot(1, 2) | ||
} | ||
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can provide users the better error message e.g. snapshot file does not exist, but I'm OK with addressing this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put it later along with the changelog file not found exception.
} | ||
|
||
protected def testGetReadStoreWithStartVersion(): Unit = { | ||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
protected def testSnapshotPartitionId(): Unit = { | ||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0) | ||
.option( | ||
StateSourceOptions.SNAPSHOT_PARTITION_ID, | ||
spark.sessionState.conf.numShufflePartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just need to be > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it is because of limit operator.
}) | ||
} | ||
|
||
// Todo: Should also test against state generated by 3.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it remaining TODO, or does not need to be done at all? If we don't need to, let's remove the golden files for 3.5. I guess it's not intentional to test cross version compatibility.
checkAnswer(stateSnapshotDf, stateDf) | ||
} | ||
|
||
protected def testSnapshotOnLimitState(providerName: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comment for tests using golden file: please leave the code as comment or so how you build the golden file (the query you used), to let other be able to re-build the golden file if needed.
} | ||
|
||
/** | ||
* Consturct the state at endVersion from snapshot from snapshotVersion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Construct the state at
if (!condition) { throw new IllegalStateException(msg) } | ||
} | ||
|
||
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a small function comment here ?
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE", | ||
messageParameters = Map( | ||
"fileToRead" -> fileToRead, | ||
"clazz" -> clazz)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a common convention for the parameter naming ? this will be visible in the error message that is thrown, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems so. the parameter names will not appear. I learned from here: https://github.com/apache/spark/blob/6bfeb094248269920df8b107c86f0982404935cd/common/utils/src/main/resources/error/error-conditions.json#L236C54-L236C59
|
||
protected def testSnapshotOnDeduplicateState(providerName: String): Unit = { | ||
/** The golden files are generated by: | ||
withSQLConf({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems odd in these places, but maybe not a big deal for such comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move one tab right.
} | ||
*/ | ||
val resourceUri = this.getClass.getResource( | ||
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were going to run against 3.5.1 and then run the query once to generate the operator metadata. Did we decide against that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - pending some minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only nits and minors. Thanks for the patience!
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) | ||
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " + | ||
logInfo(log"Retrieved snapshot at version " + | ||
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after version
, as the next string does not start with space.
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) | ||
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " + | ||
logInfo(log"Retrieved snapshot at version " + | ||
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here
} | ||
*/ | ||
val resourceUri = this.getClass.getResource( | ||
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.
messageParameters = Map("errorMsg" -> errorMsg)) | ||
|
||
class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) | ||
extends SparkUnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before e
, it's only one space.
Thanks for all the careful checks by @HeartSaVioR @anishshri-db @WweiL. This PR is ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
…to State Data Source ### What changes were proposed in this pull request? In #46944 and #47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…rtitionId to state data source ### What changes were proposed in this pull request? This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time. 1. When there is no snapshot file at `snapshotStartBatch` (note there is an off-by-one issue between version and batch Id), throw an exception. 2. Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards. 3. Note that if a `batchId` option is already specified. That batchId is the ending batchId, we should then end at that batchId. 4. This feature supports state generated by HDFS state store provider and RocksDB state store provider with changelog checkpointing enabled. **It does not support RocksDB with changelog disabled which is the default for RocksDB.** ### Why are the changes needed? Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Created test cases for testing edge cases for the input of new options. Created test for the new public function `replayReadStateFromSnapshot`. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46944 from eason-yuchen-liu/skipSnapshotAtBatch. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…ly snapshotStartBatchId option) ### What changes were proposed in this pull request? This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration. There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version. Before ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore ``` After ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false, snapshotVersionStateStoreCkptId: Option[String] = None, endVersionStateStoreCkptId: Option[String] = None): StateStore ``` This is the final PR in the series following: - #52047: Enable StateDataSource with state checkpoint v2 (only batchId option) - #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed) NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change. ### Why are the changes needed? State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148. ### Does this PR introduce _any_ user-facing change? Yes. State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used. ### How was this patch tested? In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests). `RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52202 from dylanwong250/SPARK-53332. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
What changes were proposed in this pull request?
This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time.
snapshotStartBatch
(note there is an off-by-one issue between version and batch Id), throw an exception.batchId
option is already specified. That batchId is the ending batchId, we should then end at that batchId.Why are the changes needed?
Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Created test cases for testing edge cases for the input of new options. Created test for the new public function
replayReadStateFromSnapshot
. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test.Was this patch authored or co-authored using generative AI tooling?
No.