Skip to content

Conversation

dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Sep 2, 2025

What changes were proposed in this pull request?

This PR enables StateDataSource support with state checkpoint v2 format for the snapshotStartBatchId and related options, completing the StateDataSource checkpoint v2 integration.

There is changes to the replayStateFromSnapshot method signature. snapshotVersionStateStoreCkptId and endVersionStateStoreCkptId. Both are needed as snapshotVersionStateStoreCkptId is used when getting the snapshot and endVersionStateStoreCkptId for calculating the full lineage from the final version.

Before

def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore

After

def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
      snapshotVersion: Long,
      endVersion: Long,
      readOnly: Boolean = false,
      snapshotVersionStateStoreCkptId: Option[String] = None,
      endVersionStateStoreCkptId: Option[String] = None): StateStore

This is the final PR in the series following:

NOTE: To read checkpoint v2 state data sources it is required to have "spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

Why are the changes needed?

State checkpoint v2 ("spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only batchId was implemented in #52047 and only readChangeFeed was implemented in #52148.

Does this PR introduce any user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the snapshotStartBatchId and related options are used.

How was this patch tested?

In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).

RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite is added which uses the golden file approach similar to #46944 where snapshotStartBatchId is first added.

Was this patch authored or co-authored using generative AI tooling?

No

/** Helper class for [[RocksDBCheckpointMetadata]] */
object RocksDBCheckpointMetadata {
val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion
val VERSION = 1
Copy link
Contributor Author

@dylanwong250 dylanwong250 Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to point out this change. I believe using val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion is incorrect in this case and I was seeing a few test failures. The QLConf.get.stateStoreCheckpointFormatVersion refers to the checkpoint format version not the metadata format version. Additionally, since this is inside an object as a val it will be instantiated once and is not the current value of SQLConf.get.stateStoreCheckpointFormatVersion. I think this VERSION may be shared incorrectly also.

I also experimented with having VERSION equal to the current value. I ran into a few issues with the maintenance threads having a different version than the streaming query they were ran in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this!

.replayStateFromSnapshot(
opts.snapshotVersion,
opts.endVersion,
readOnly = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is readOnly false for state reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had it to readOnly = false to match what the replayStateFromSnapshot default value was without the uniqueIds. Also looking at the previous PR for snapshot reading #46944 it seems the readOnly option did not exist at the time. I changed to readOnly = true and all the tests pass so I think it is better to switch to readOnly = true.

}

/** Snapshot options specialized for a single state store handler. */
case class HandlerSnapshotOptions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made private[join]

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

startKeyWithIndexToValueStateStoreCkptId: Option[String] = None,
endKeyToNumValuesStateStoreCkptId: Option[String] = None,
endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) {
def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indents seem wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private val keyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) {
private val endStateStoreCheckpointIds =
SymmetricHashJoinStateManager.getStateStoreCheckpointIds(
partition.partition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent for the args ?

protected class KeyWithIndexToValueStore(stateFormatVersion: Int)
extends StateStoreHandler(KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId) {
protected class KeyWithIndexToValueStore(
stateFormatVersion: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be 4 spaces here ?

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending nits

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants