Skip to content

Conversation

@dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Aug 27, 2025

What changes were proposed in this pull request?

This PR extends StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) support for state checkpoint v2 format to include the readChangeFeed functionality. This PR now enables users to read change feeds from state stores using checkpoint v2 format by:

  • Implementing full lineage reconstruction across multiple changelog files using getFullLineage in RocksDBFileManager. This is needed because changelog files only contain lineage from [snapShotVersion, version) and we may need the versions for all changelog files across snapshot boundaries.

  • Adding support for getStateStoreChangeDataReader to have and use the endVersionStateStoreCkptId parameter. Since we can construct the full lineage to the start version from the last version and endVersionStateStoreCkptId we do not need a startVersionStateStoreCkptId. However when snapshotStartBatchId is implemented startVersionStateStoreCkptId and endVersionStateStoreCkptId will be needed to maintain the current behavior.

  • Adding an extra parameter to setStoreMetrics to determine whether or not to call store.getStateStoreCheckpointInfo(). If we call this in the abort case in TransformWithStateExec or TransformWithStateInPySparkExec it will throw an exception and we do not want this.

The key enhancement is the ability to read change feeds that span across multiple snapshots by walking backwards through the lineage information embedded in changelog files to construct the complete version history.

NOTE: To read checkpoint v2 state data sources it is required to have "spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

Why are the changes needed?

State checkpoint v2 ("spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only batchId was implemented in #52047.

Does this PR introduce any user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the readChangeFeed option is used.

How was this patch tested?

Adds a new test suite RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite that reuses the unit tests in RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite but with checkpoint v2 enabled and adds tests for the case of reading across snapshot boundaries.

testOnly *RocksDBWithCheckpointV2StateDataSourceChangeDataReaderSuite
[info] Total number of tests run: 10
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Adds a new test suite StateDataSourceTransformWithStateSuiteCheckpointV2 that reuses the unit tests in StateDataSourceTransformWithStateSuite but with checkpoint v2 enabled.

testOnly *StateDataSourceTransformWithStateSuiteCheckpointV2

Note that the cancelled tests are to not run the tests that use snapshotStartBatchId.

[info] Total number of tests run: 44
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 44, failed 0, canceled 2, ignored 0, pending 0
[info] All tests passed

Adds a new test suite TransformWithStateInitialStateSuiteCheckpointV2 that reuses the unit tests in TransformWithStateInitialStateSuite but with checkpoint v2 enabled.

testOnly *TransformWithStateInitialStateSuiteCheckpointV2
[info] Total number of tests run: 44
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Adds a new tests TransformWithStateInPandasWithCheckpointV2Tests and TransformWithStateInPySparkWithCheckpointV2Tests that reuses the unit tests in python that test the State Data Source.

Was this patch authored or co-authored using generative AI tooling?

No

* This should be called in that task after the store has been updated.
*/
protected def setStoreMetrics(store: StateStore): Unit = {
protected def setStoreMetrics(store: StateStore, setCheckpointInfo: Boolean = true): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why do we need this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In setStoreMetrics we call store.getStateStoreCheckpointInfo(). If we call this in the store.abort() case in TransformWithStateExec or TransformWithStateInPySparkExec it will throw an exception since the checkpoint info does not exist since we never committed. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala#L343

.load(tempDir.getAbsolutePath)

val expectedDf = Seq(
Row(0L, "update", Row(3), Row(1), 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also track other operations such as insert, update, delete etc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test in this suite with a delete in the change feed. StateDataSourceTransformWithStateSuite also has a few tests with append and delete.

val prevSmallestVersion = buf.last.version
val lineage = getLineageFromChangelogFile(buf.last.version, Some(buf.last.checkpointUniqueId))
// lineage array is sorted in increasing order, we need to reverse it
val lineageSorted = lineage.filter(_.version >= startVersion).sortBy(_.version).reverse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just pass descending as the negative key or Ordering param ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to.sortBy(-_.version)


// to prevent infinite loop if we make no progress, throw an exception
if (buf.last.version == prevSmallestVersion) {
throw new IllegalStateException(s"Lineage is not complete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create an error class for this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created INVALID_CHECKPOINT_LINEAGE.

}

if (startOperatorStateUniqueIds.isDefined != endOperatorStateUniqueIds.isDefined) {
throw StateDataSourceErrors.internalError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm - this is backed by an error class correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is. But I changed it to be backed by a new error class STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED

assert(ret.last.version == endVersion,
s"Expected last lineage version to be $endVersion, but got ${ret.last.version}")
// Assert that the lineage array is strictly increasing in version
assert(ret.sliding(2).forall {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this to an error class as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made these also use INVALID_CHECKPOINT_LINEAGE.

Serialization.read[Array[LineageItem]](lineageStr)
}

// The array contains lineage information from [snapShotVersion, version]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both left and right inclusive right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a mistake it is actually [snapShotVersion, version). The version that was used to get this array is not included. I updated the comment to make this more clear.

},
"STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED" : {
"message" : [
"Reading state across different checkpoint format versions is not supported. startBatchId=<startBatchId>, endBatchId=<endBatchId>."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add the different checkpoint format versions they used here? I know there are only 2 now but we will add more in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the error message to:

"message" : [
      "Reading state across different checkpoint format versions is not supported.",
      "startBatchId=<startBatchId>, endBatchId=<endBatchId>.",
      "startFormatVersion=<startFormatVersion>, endFormatVersion=<endFormatVersion>."
    ],

StateStoreChangeDataReader = {

if (endVersionStateStoreCkptId.isDefined) {
throw QueryExecutionErrors.cannotLoadStore(new SparkException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a new error condition for this (and change the other place where we do this)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error class STATE_STORE_CHECKPOINT_IDS_NOT_SUPPORTED and used it here and the other places.

* Construct the full lineage from startVersion to endVersion (inclusive) by
* walking backwards using lineage information embedded in changelog files.
*/
def getFullLineage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some unit tests for this new function? The logic seems quite complicated, want to make sure we can test all edge cases. Particularly the error cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added RocksDBLineageSuite.scala that covers the main error cases.

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
class RocksDBStateStoreChangeDataReader(
fm: CheckpointFileManager,
rocksDB: RocksDB,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, seems a little strange to me that we are passing in RocksDB here in its entirety just so we can use getFullLineage. Is there a way to abstract out the getFullLineage functionality so we can reuse it a different way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had done some refactoring to refactor all the lineage related methods to RocksDBFileManager and only pass that in here. I did not do it in this PR just to reduce the amount of changes in this PR.

At a glance, all the lineage related methods (getChangelogReader, getLineageFromChangelogFile) exist in either RocksDB or RocksDBFilemanager. We should be able to abstract these methods out into something like ChangelogFileManager.scala since changelog lineage stuff is not directly dependent on RocksDB related methods.

I am not sure if we want to do this refactoring in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that's fine. We don't have to do it in this PR

throw QueryExecutionErrors.invalidCheckpointLineage(printLineageItems(ret),
s"Lineage does not end with endVersion: $endVersion.")
}
val increasingByOne = ret.sliding(2).forall {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some more comments for what this block is doing ?

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - pending green CI

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - just one comment RE testing

}
}

test("getFullLineage: multi-hop across changelog files") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test case where there are multiple files (with different lineages) for a single version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "getFullLineage: multiple lineages exist for the same version"

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
class RocksDBStateStoreChangeDataReader(
fm: CheckpointFileManager,
rocksDB: RocksDB,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that's fine. We don't have to do it in this PR

anishshri-db pushed a commit that referenced this pull request Sep 10, 2025
…ly snapshotStartBatchId option)

### What changes were proposed in this pull request?

This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration.

There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version.

Before
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
```

After
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
      snapshotVersion: Long,
      endVersion: Long,
      readOnly: Boolean = false,
      snapshotVersionStateStoreCkptId: Option[String] = None,
      endVersionStateStoreCkptId: Option[String] = None): StateStore
```

This is the final PR in the series following:
  - #52047: Enable StateDataSource with state checkpoint v2 (only batchId option)
  - #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed)

NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

### Why are the changes needed?

State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148.

### Does this PR introduce _any_ user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used.

### How was this patch tested?

In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).

`RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52202 from dylanwong250/SPARK-53332.

Authored-by: Dylan Wong <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants