From 61823bdcd2d2ada25084b48f40edcabb1acadd1d Mon Sep 17 00:00:00 2001 From: Yuchen Liu <170372783+eason-yuchen-liu@users.noreply.github.com> Date: Mon, 10 Jun 2024 10:10:44 -0700 Subject: [PATCH 1/2] check whether RocksDB already loads with the latest snapshot --- .../execution/streaming/state/RocksDB.scala | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 088242b4246e5..1e97621cb4ad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -180,29 +180,31 @@ class RocksDB( logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}") try { if (loadedVersion != version) { - closeDB() val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) - val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) - loadedVersion = latestSnapshotVersion - - // reset last snapshot version - if (lastSnapshotVersion > latestSnapshotVersion) { - // discard any newer snapshots - lastSnapshotVersion = 0L - latestSnapshot = None - } - openDB() - - numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { - // we don't track the total number of rows - discard the number being track - -1L - } else if (metadata.numKeys < 0) { - // we track the total number of rows, but the snapshot doesn't have tracking number - // need to count keys now - countKeys() - } else { - metadata.numKeys + if (loadedVersion != latestSnapshotVersion) { + closeDB() + val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) + loadedVersion = latestSnapshotVersion + + // reset last snapshot version + if (lastSnapshotVersion > latestSnapshotVersion) { + // discard any newer snapshots + lastSnapshotVersion = 0L + latestSnapshot = None } + openDB() + + numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { + // we don't track the total number of rows - discard the number being track + -1L + } else if (metadata.numKeys < 0) { + // we track the total number of rows, but the snapshot doesn't have tracking number + // need to count keys now + countKeys() + } else { + metadata.numKeys + } + } if (loadedVersion != version) replayChangelog(version) // After changelog replay the numKeysOnWritingVersion will be updated to // the correct number of keys in the loaded version. From 420e5e2186c6949e2156a2f7b452d23c779ef3b6 Mon Sep 17 00:00:00 2001 From: Yuchen Liu <170372783+eason-yuchen-liu@users.noreply.github.com> Date: Mon, 10 Jun 2024 10:30:00 -0700 Subject: [PATCH 2/2] fix style --- .../apache/spark/sql/execution/streaming/state/RocksDB.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 1e97621cb4ad3..1ae0fef8eb291 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -185,7 +185,7 @@ class RocksDB( closeDB() val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion - + // reset last snapshot version if (lastSnapshotVersion > latestSnapshotVersion) { // discard any newer snapshots @@ -193,7 +193,7 @@ class RocksDB( latestSnapshot = None } openDB() - + numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { // we don't track the total number of rows - discard the number being track -1L