-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53294][SS] Enable StateDataSource with state checkpoint v2 (only batchId option) #52047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-53294][SS] Enable StateDataSource with state checkpoint v2 (only batchId option) #52047
Conversation
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops didn't mean to approve
} | ||
|
||
if (commitMetadata.stateUniqueIds.isDefined) { | ||
Some(commitMetadata.stateUniqueIds.get(operatorId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This can be written in a more scala way, without if-else. Maybe with stateUniqueIds.map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to commitMetadata.stateUniqueIds.flatMap(_.get(operatorId))
/** | ||
* Constants for store names used in Stream-Stream joins. | ||
*/ | ||
object StatePartitionReaderStoreNames { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why define these names here? These are join specific and shouldn't live here. I think they should already be defined in the join code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed these and refactored a bit. I added more detail in the other comment in this file.
partition.sourceOptions.operatorStateUniqueIds, | ||
useColumnFamiliesForJoins = false) | ||
|
||
partition.sourceOptions.storeName match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to do this here. This can be done within the join call above and it will just return the id you need for the storeName, instead of returning the entire stateStoreCheckpointIds
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a new method getStateStoreCheckpointId
in SymmetricHashJoinStateManager which maps (storeName -> correct checkpoint id) done in one function call. Let me know if this makes more sense.
rocksDB.load( | ||
version, | ||
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None, | ||
stateStoreCkptId = uniqueId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove the conf check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the behavior was a bit confusing where uniqueId could be Some("") but would not be used to get the underlying store.
This also would need to be removed in the future if we wanted to enable reading checkpoint v2 stores when enableStateStoreCheckpointIds = false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this as a separate change though @dylanwong250 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I removed it. We may have to add it back depending on this comment https://github.com/apache/spark/pull/52047/files#r2283491163.
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
private val keyWithIndexToValueStateStoreCkptId = if (joinSide == LeftSide) { | ||
stateStoreCheckpointIds.left.valueToNumKeys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is valueToNumKeys
here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was a typo in the method I am calling. I refactored by changing valueToNumKeys
-> keyWithIndexToValue
. This now follows the current store names we use.
rocksDB.load( | ||
version, | ||
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None, | ||
stateStoreCkptId = uniqueId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this as a separate change though @dylanwong250 ?
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
* store checkpoint IDs. | ||
* @param partitionId | ||
* @param stateInfo | ||
* @param stateStoreCkptIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the comments here for the params ?
* checkpoint IDs when not using virtual column families. | ||
* This function is used to get the checkpoint ID for a specific state store | ||
* by the name of the store, partition ID and the checkpoint IDs array. | ||
* @param storeName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one minor testing comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - will merge once CI is green
…ly readChangeFeed) ### What changes were proposed in this pull request? This PR extends StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) support for state checkpoint v2 format to include the `readChangeFeed` functionality. This PR now enables users to read change feeds from state stores using checkpoint v2 format by: - Implementing full lineage reconstruction across multiple changelog files using `getFullLineage` in RocksDBFileManager. This is needed because changelog files only contain lineage from [snapShotVersion, version) and we may need the versions for all changelog files across snapshot boundaries. - Adding support for `getStateStoreChangeDataReader` to have and use the `endVersionStateStoreCkptId` parameter. Since we can construct the full lineage to the start version from the last version and `endVersionStateStoreCkptId` we do not need a `startVersionStateStoreCkptId`. However when `snapshotStartBatchId` is implemented `startVersionStateStoreCkptId` and `endVersionStateStoreCkptId` will be needed to maintain the current behavior. - Adding an extra parameter to `setStoreMetrics` to determine whether or not to call `store.getStateStoreCheckpointInfo()`. If we call this in the abort case in `TransformWithStateExec` or `TransformWithStateInPySparkExec` it will throw an exception and we do not want this. The key enhancement is the ability to read change feeds that span across multiple snapshots by walking backwards through the lineage information embedded in changelog files to construct the complete version history. NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change. ### Why are the changes needed? State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047. ### Does this PR introduce _any_ user-facing change? Yes. State Data Source will work when checkpoint v2 is used and the `readChangeFeed` option is used. ### How was this patch tested? Adds a new test suite `RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite` that reuses the unit tests in `RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite` but with checkpoint v2 enabled and adds tests for the case of reading across snapshot boundaries. ``` testOnly *RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite ``` ``` [info] Total number of tests run: 10 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Adds a new test suite `StateDataSourceTransformWithStateSuiteCheckpointV2` that reuses the unit tests in `StateDataSourceTransformWithStateSuite` but with checkpoint v2 enabled. ``` testOnly *StateDataSourceTransformWithStateSuiteCheckpointV2 ``` Note that the cancelled tests are to not run the tests that use `snapshotStartBatchId`. ``` [info] Total number of tests run: 44 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 44, failed 0, canceled 2, ignored 0, pending 0 [info] All tests passed ``` Adds a new test suite `TransformWithStateInitialStateSuiteCheckpointV2` that reuses the unit tests in `TransformWithStateInitialStateSuite` but with checkpoint v2 enabled. ``` testOnly *TransformWithStateInitialStateSuiteCheckpointV2 ``` ``` [info] Total number of tests run: 44 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Adds a new tests `TransformWithStateInPandasWithCheckpointV2Tests` and `TransformWithStateInPySparkWithCheckpointV2Tests` that reuses the unit tests in python that test the State Data Source. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52148 from dylanwong250/SPARK-53333. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
…ly snapshotStartBatchId option) ### What changes were proposed in this pull request? This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration. There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version. Before ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore ``` After ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false, snapshotVersionStateStoreCkptId: Option[String] = None, endVersionStateStoreCkptId: Option[String] = None): StateStore ``` This is the final PR in the series following: - #52047: Enable StateDataSource with state checkpoint v2 (only batchId option) - #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed) NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change. ### Why are the changes needed? State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148. ### Does this PR introduce _any_ user-facing change? Yes. State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used. ### How was this patch tested? In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests). `RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52202 from dylanwong250/SPARK-53332. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
What changes were proposed in this pull request?
This PR enables StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) to work with state checkpoint v2 format (
"spark.sql.streaming.stateStore.checkpointFormatVersion"
) when using thebatchId
option. This is done by retrieving thestateUniqueIds
from the CommitLog and using the correct partition of these Ids to read from the state store.A check is added to throw an error when users try to use the
readChangeFeed
orchangeStartBatchId
options when the CommitLog metadata containsstateUniqueIds
.NOTE: To read checkpoint v2 state data sources it is required to have
"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2
. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.Why are the changes needed?
State checkpoint v2 (
"spark.sql.streaming.stateStore.checkpointFormatVersion"
) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format.Does this PR introduce any user-facing change?
Yes.
STDS_INVALID_OPTION_VALUE will be thrown when
readChangeFeed
orchangeStartBatchId
options are used when the CommitLog containsstateUniqueIds
. Previously an error related to the store not existing would be thrown.State Data Source will work when checkpoint v2 is used and
batchId
is used.How was this patch tested?
Adds a new test suite
RocksDBWithCheckpointV2StateDataSourceReaderSuite
that reuses the unit tests inStateDataSourceReadSuite
and adds tests for the new error cases.Was this patch authored or co-authored using generative AI tooling?
No