-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53332][SS] Enable StateDataSource with state checkpoint v2 (only snapshotStartBatchId option) #52202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| /** Helper class for [[RocksDBCheckpointMetadata]] */ | ||
| object RocksDBCheckpointMetadata { | ||
| val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion | ||
| val VERSION = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to point out this change. I believe using val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion is incorrect in this case and I was seeing a few test failures. The QLConf.get.stateStoreCheckpointFormatVersion refers to the checkpoint format version not the metadata format version. Additionally, since this is inside an object as a val it will be instantiated once and is not the current value of SQLConf.get.stateStoreCheckpointFormatVersion. I think this VERSION may be shared incorrectly also.
I also experimented with having VERSION equal to the current value. I ran into a few issues with the maintenance threads having a different version than the streaming query they were ran in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this!
| .replayStateFromSnapshot( | ||
| opts.snapshotVersion, | ||
| opts.endVersion, | ||
| readOnly = false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is readOnly false for state reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially had it to readOnly = false to match what the replayStateFromSnapshot default value was without the uniqueIds. Also looking at the previous PR for snapshot reading #46944 it seems the readOnly option did not exist at the time. I changed to readOnly = true and all the tests pass so I think it is better to switch to readOnly = true.
...he/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| /** Snapshot options specialized for a single state store handler. */ | ||
| case class HandlerSnapshotOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made private[join]
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
| startKeyWithIndexToValueStateStoreCkptId: Option[String] = None, | ||
| endKeyToNumValuesStateStoreCkptId: Option[String] = None, | ||
| endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) { | ||
| def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indents seem wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. I used https://github.com/databricks/scala-style-guide?tab=readme-ov-file#spacing-and-indentation for reference.
| private val keyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) { | ||
| private val endStateStoreCheckpointIds = | ||
| SymmetricHashJoinStateManager.getStateStoreCheckpointIds( | ||
| partition.partition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent for the args ?
| protected class KeyWithIndexToValueStore(stateFormatVersion: Int) | ||
| extends StateStoreHandler(KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId) { | ||
| protected class KeyWithIndexToValueStore( | ||
| stateFormatVersion: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be 4 spaces here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending nits
What changes were proposed in this pull request?
This PR enables StateDataSource support with state checkpoint v2 format for the
snapshotStartBatchIdand related options, completing the StateDataSource checkpoint v2 integration.There is changes to the replayStateFromSnapshot method signature.
snapshotVersionStateStoreCkptIdandendVersionStateStoreCkptId. Both are needed assnapshotVersionStateStoreCkptIdis used when getting the snapshot andendVersionStateStoreCkptIdfor calculating the full lineage from the final version.Before
After
This is the final PR in the series following:
NOTE: To read checkpoint v2 state data sources it is required to have
"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.Why are the changes needed?
State checkpoint v2 (
"spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. OnlybatchIdwas implemented in #52047 and onlyreadChangeFeedwas implemented in #52148.Does this PR introduce any user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the
snapshotStartBatchIdand related options are used.How was this patch tested?
In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).
RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuiteis added which uses the golden file approach similar to #46944 wheresnapshotStartBatchIdis first added.Was this patch authored or co-authored using generative AI tooling?
No