Skip to content

Conversation

dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Aug 15, 2025

What changes were proposed in this pull request?

This PR enables StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) to work with state checkpoint v2 format ("spark.sql.streaming.stateStore.checkpointFormatVersion") when using the batchId option. This is done by retrieving the stateUniqueIds from the CommitLog and using the correct partition of these Ids to read from the state store.

A check is added to throw an error when users try to use the readChangeFeed or changeStartBatchId options when the CommitLog metadata contains stateUniqueIds.

NOTE: To read checkpoint v2 state data sources it is required to have "spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

Why are the changes needed?

State checkpoint v2 ("spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format.

Does this PR introduce any user-facing change?

Yes.

STDS_INVALID_OPTION_VALUE will be thrown when readChangeFeed or changeStartBatchId options are used when the CommitLog contains stateUniqueIds. Previously an error related to the store not existing would be thrown.

State Data Source will work when checkpoint v2 is used and batchId is used.

How was this patch tested?

Adds a new test suite RocksDBWithCheckpointV2StateDataSourceReaderSuite that reuses the unit tests in StateDataSourceReadSuite and adds tests for the new error cases.

testOnly *RocksDBWithCheckpointV2StateDataSourceReaderSuite
[info] Total number of tests run: 16
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 16, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops didn't mean to approve

}

if (commitMetadata.stateUniqueIds.isDefined) {
Some(commitMetadata.stateUniqueIds.get(operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can be written in a more scala way, without if-else. Maybe with stateUniqueIds.map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to commitMetadata.stateUniqueIds.flatMap(_.get(operatorId))

/**
* Constants for store names used in Stream-Stream joins.
*/
object StatePartitionReaderStoreNames {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why define these names here? These are join specific and shouldn't live here. I think they should already be defined in the join code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these and refactored a bit. I added more detail in the other comment in this file.

partition.sourceOptions.operatorStateUniqueIds,
useColumnFamiliesForJoins = false)

partition.sourceOptions.storeName match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do this here. This can be done within the join call above and it will just return the id you need for the storeName, instead of returning the entire stateStoreCheckpointIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a new method getStateStoreCheckpointId in SymmetricHashJoinStateManager which maps (storeName -> correct checkpoint id) done in one function call. Let me know if this makes more sense.

rocksDB.load(
version,
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None,
stateStoreCkptId = uniqueId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the conf check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the behavior was a bit confusing where uniqueId could be Some("") but would not be used to get the underlying store.

This also would need to be removed in the future if we wanted to enable reading checkpoint v2 stores when enableStateStoreCheckpointIds = false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this as a separate change though @dylanwong250 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I removed it. We may have to add it back depending on this comment https://github.com/apache/spark/pull/52047/files#r2283491163.

}

private val keyWithIndexToValueStateStoreCkptId = if (joinSide == LeftSide) {
stateStoreCheckpointIds.left.valueToNumKeys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is valueToNumKeys here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was a typo in the method I am calling. I refactored by changing valueToNumKeys -> keyWithIndexToValue. This now follows the current store names we use.

rocksDB.load(
version,
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None,
stateStoreCkptId = uniqueId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this as a separate change though @dylanwong250 ?

* store checkpoint IDs.
* @param partitionId
* @param stateInfo
* @param stateStoreCkptIds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comments here for the params ?

* checkpoint IDs when not using virtual column families.
* This function is used to get the checkpoint ID for a specific state store
* by the name of the store, partition ID and the checkpoint IDs array.
* @param storeName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one minor testing comment

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - will merge once CI is green

anishshri-db pushed a commit that referenced this pull request Sep 2, 2025
…ly readChangeFeed)

### What changes were proposed in this pull request?

This PR extends StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) support for state checkpoint v2 format to include the `readChangeFeed` functionality. This PR now enables users to read change feeds from state stores using checkpoint v2 format by:

- Implementing full lineage reconstruction across multiple changelog files using `getFullLineage` in RocksDBFileManager. This is needed because changelog files only contain lineage from [snapShotVersion, version) and we may need the versions for all changelog files across snapshot boundaries.

- Adding support for `getStateStoreChangeDataReader` to have and use the `endVersionStateStoreCkptId` parameter. Since we can construct the full lineage to the start version from the last version and `endVersionStateStoreCkptId` we do not need a `startVersionStateStoreCkptId`. However when `snapshotStartBatchId` is implemented `startVersionStateStoreCkptId` and `endVersionStateStoreCkptId` will be needed to maintain the current behavior.

- Adding an extra parameter to `setStoreMetrics` to determine whether or not to call `store.getStateStoreCheckpointInfo()`. If we call this in the abort case in `TransformWithStateExec` or `TransformWithStateInPySparkExec` it will throw an exception and we do not want this.

The key enhancement is the ability to read change feeds that span across multiple snapshots by walking backwards through the lineage information embedded in changelog files to construct the complete version history.

NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

### Why are the changes needed?

State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047.

### Does this PR introduce _any_ user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the `readChangeFeed` option is used.

### How was this patch tested?

Adds a new test suite `RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite` that reuses the unit tests in `RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite` but with checkpoint v2 enabled and adds tests for the case of reading across snapshot boundaries.
```
testOnly *RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite
```
```
[info] Total number of tests run: 10
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Adds a new test suite `StateDataSourceTransformWithStateSuiteCheckpointV2` that reuses the unit tests in `StateDataSourceTransformWithStateSuite` but with checkpoint v2 enabled.
```
testOnly *StateDataSourceTransformWithStateSuiteCheckpointV2
```
Note that the cancelled tests are to not run the tests that use `snapshotStartBatchId`.
```
[info] Total number of tests run: 44
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 44, failed 0, canceled 2, ignored 0, pending 0
[info] All tests passed
```

Adds a new test suite `TransformWithStateInitialStateSuiteCheckpointV2` that reuses the unit tests in `TransformWithStateInitialStateSuite` but with checkpoint v2 enabled.
```
testOnly *TransformWithStateInitialStateSuiteCheckpointV2
```
```
[info] Total number of tests run: 44
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Adds a new tests `TransformWithStateInPandasWithCheckpointV2Tests` and `TransformWithStateInPySparkWithCheckpointV2Tests` that reuses the unit tests in python that test the State Data Source.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52148 from dylanwong250/SPARK-53333.

Authored-by: Dylan Wong <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
anishshri-db pushed a commit that referenced this pull request Sep 10, 2025
…ly snapshotStartBatchId option)

### What changes were proposed in this pull request?

This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration.

There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version.

Before
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
```

After
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
      snapshotVersion: Long,
      endVersion: Long,
      readOnly: Boolean = false,
      snapshotVersionStateStoreCkptId: Option[String] = None,
      endVersionStateStoreCkptId: Option[String] = None): StateStore
```

This is the final PR in the series following:
  - #52047: Enable StateDataSource with state checkpoint v2 (only batchId option)
  - #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed)

NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

### Why are the changes needed?

State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148.

### Does this PR introduce _any_ user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used.

### How was this patch tested?

In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).

`RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52202 from dylanwong250/SPARK-53332.

Authored-by: Dylan Wong <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants