-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53333][SS] Enable StateDataSource with state checkpoint v2 (only readChangeFeed) #52148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * This should be called in that task after the store has been updated. | ||
| */ | ||
| protected def setStoreMetrics(store: StateStore): Unit = { | ||
| protected def setStoreMetrics(store: StateStore, setCheckpointInfo: Boolean = true): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm why do we need this change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In setStoreMetrics we call store.getStateStoreCheckpointInfo(). If we call this in the store.abort() case in TransformWithStateExec or TransformWithStateInPySparkExec it will throw an exception since the checkpoint info does not exist since we never committed. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala#L343
| .load(tempDir.getAbsolutePath) | ||
|
|
||
| val expectedDf = Seq( | ||
| Row(0L, "update", Row(3), Row(1), 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also track other operations such as insert, update, delete etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test in this suite with a delete in the change feed. StateDataSourceTransformWithStateSuite also has a few tests with append and delete.
| val prevSmallestVersion = buf.last.version | ||
| val lineage = getLineageFromChangelogFile(buf.last.version, Some(buf.last.checkpointUniqueId)) | ||
| // lineage array is sorted in increasing order, we need to reverse it | ||
| val lineageSorted = lineage.filter(_.version >= startVersion).sortBy(_.version).reverse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just pass descending as the negative key or Ordering param ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to.sortBy(-_.version)
|
|
||
| // to prevent infinite loop if we make no progress, throw an exception | ||
| if (buf.last.version == prevSmallestVersion) { | ||
| throw new IllegalStateException(s"Lineage is not complete") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create an error class for this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Created INVALID_CHECKPOINT_LINEAGE.
| } | ||
|
|
||
| if (startOperatorStateUniqueIds.isDefined != endOperatorStateUniqueIds.isDefined) { | ||
| throw StateDataSourceErrors.internalError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm - this is backed by an error class correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is. But I changed it to be backed by a new error class STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED
| assert(ret.last.version == endVersion, | ||
| s"Expected last lineage version to be $endVersion, but got ${ret.last.version}") | ||
| // Assert that the lineage array is strictly increasing in version | ||
| assert(ret.sliding(2).forall { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this to an error class as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made these also use INVALID_CHECKPOINT_LINEAGE.
| Serialization.read[Array[LineageItem]](lineageStr) | ||
| } | ||
|
|
||
| // The array contains lineage information from [snapShotVersion, version] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both left and right inclusive right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a mistake it is actually [snapShotVersion, version). The version that was used to get this array is not included. I updated the comment to make this more clear.
| }, | ||
| "STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED" : { | ||
| "message" : [ | ||
| "Reading state across different checkpoint format versions is not supported. startBatchId=<startBatchId>, endBatchId=<endBatchId>." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add the different checkpoint format versions they used here? I know there are only 2 now but we will add more in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the error message to:
"message" : [
"Reading state across different checkpoint format versions is not supported.",
"startBatchId=<startBatchId>, endBatchId=<endBatchId>.",
"startFormatVersion=<startFormatVersion>, endFormatVersion=<endFormatVersion>."
],
| StateStoreChangeDataReader = { | ||
|
|
||
| if (endVersionStateStoreCkptId.isDefined) { | ||
| throw QueryExecutionErrors.cannotLoadStore(new SparkException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a new error condition for this (and change the other place where we do this)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added error class STATE_STORE_CHECKPOINT_IDS_NOT_SUPPORTED and used it here and the other places.
| * Construct the full lineage from startVersion to endVersion (inclusive) by | ||
| * walking backwards using lineage information embedded in changelog files. | ||
| */ | ||
| def getFullLineage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some unit tests for this new function? The logic seems quite complicated, want to make sure we can test all edge cases. Particularly the error cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added RocksDBLineageSuite.scala that covers the main error cases.
| /** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */ | ||
| class RocksDBStateStoreChangeDataReader( | ||
| fm: CheckpointFileManager, | ||
| rocksDB: RocksDB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, seems a little strange to me that we are passing in RocksDB here in its entirety just so we can use getFullLineage. Is there a way to abstract out the getFullLineage functionality so we can reuse it a different way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially had done some refactoring to refactor all the lineage related methods to RocksDBFileManager and only pass that in here. I did not do it in this PR just to reduce the amount of changes in this PR.
At a glance, all the lineage related methods (getChangelogReader, getLineageFromChangelogFile) exist in either RocksDB or RocksDBFilemanager. We should be able to abstract these methods out into something like ChangelogFileManager.scala since changelog lineage stuff is not directly dependent on RocksDB related methods.
I am not sure if we want to do this refactoring in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure that's fine. We don't have to do it in this PR
| throw QueryExecutionErrors.invalidCheckpointLineage(printLineageItems(ret), | ||
| s"Lineage does not end with endVersion: $endVersion.") | ||
| } | ||
| val increasingByOne = ret.sliding(2).forall { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some more comments for what this block is doing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - pending green CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - just one comment RE testing
| } | ||
| } | ||
|
|
||
| test("getFullLineage: multi-hop across changelog files") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a test case where there are multiple files (with different lineages) for a single version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added "getFullLineage: multiple lineages exist for the same version"
| /** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */ | ||
| class RocksDBStateStoreChangeDataReader( | ||
| fm: CheckpointFileManager, | ||
| rocksDB: RocksDB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure that's fine. We don't have to do it in this PR
…ly snapshotStartBatchId option)
### What changes were proposed in this pull request?
This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration.
There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version.
Before
```
def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
```
After
```
def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
snapshotVersion: Long,
endVersion: Long,
readOnly: Boolean = false,
snapshotVersionStateStoreCkptId: Option[String] = None,
endVersionStateStoreCkptId: Option[String] = None): StateStore
```
This is the final PR in the series following:
- #52047: Enable StateDataSource with state checkpoint v2 (only batchId option)
- #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed)
NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.
### Why are the changes needed?
State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148.
### Does this PR introduce _any_ user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used.
### How was this patch tested?
In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).
`RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52202 from dylanwong250/SPARK-53332.
Authored-by: Dylan Wong <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
What changes were proposed in this pull request?
This PR extends StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) support for state checkpoint v2 format to include the
readChangeFeedfunctionality. This PR now enables users to read change feeds from state stores using checkpoint v2 format by:Implementing full lineage reconstruction across multiple changelog files using
getFullLineagein RocksDBFileManager. This is needed because changelog files only contain lineage from [snapShotVersion, version) and we may need the versions for all changelog files across snapshot boundaries.Adding support for
getStateStoreChangeDataReaderto have and use theendVersionStateStoreCkptIdparameter. Since we can construct the full lineage to the start version from the last version andendVersionStateStoreCkptIdwe do not need astartVersionStateStoreCkptId. However whensnapshotStartBatchIdis implementedstartVersionStateStoreCkptIdandendVersionStateStoreCkptIdwill be needed to maintain the current behavior.Adding an extra parameter to
setStoreMetricsto determine whether or not to callstore.getStateStoreCheckpointInfo(). If we call this in the abort case inTransformWithStateExecorTransformWithStateInPySparkExecit will throw an exception and we do not want this.The key enhancement is the ability to read change feeds that span across multiple snapshots by walking backwards through the lineage information embedded in changelog files to construct the complete version history.
NOTE: To read checkpoint v2 state data sources it is required to have
"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.Why are the changes needed?
State checkpoint v2 (
"spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. OnlybatchIdwas implemented in #52047.Does this PR introduce any user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the
readChangeFeedoption is used.How was this patch tested?
Adds a new test suite
RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuitethat reuses the unit tests inRocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuitebut with checkpoint v2 enabled and adds tests for the case of reading across snapshot boundaries.Adds a new test suite
StateDataSourceTransformWithStateSuiteCheckpointV2that reuses the unit tests inStateDataSourceTransformWithStateSuitebut with checkpoint v2 enabled.Note that the cancelled tests are to not run the tests that use
snapshotStartBatchId.Adds a new test suite
TransformWithStateInitialStateSuiteCheckpointV2that reuses the unit tests inTransformWithStateInitialStateSuitebut with checkpoint v2 enabled.Adds a new tests
TransformWithStateInPandasWithCheckpointV2TestsandTransformWithStateInPySparkWithCheckpointV2Teststhat reuses the unit tests in python that test the State Data Source.Was this patch authored or co-authored using generative AI tooling?
No