diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 088242b4246e5..1ae0fef8eb291 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -180,29 +180,31 @@ class RocksDB( logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}") try { if (loadedVersion != version) { - closeDB() val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) - val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) - loadedVersion = latestSnapshotVersion - - // reset last snapshot version - if (lastSnapshotVersion > latestSnapshotVersion) { - // discard any newer snapshots - lastSnapshotVersion = 0L - latestSnapshot = None - } - openDB() - - numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { - // we don't track the total number of rows - discard the number being track - -1L - } else if (metadata.numKeys < 0) { - // we track the total number of rows, but the snapshot doesn't have tracking number - // need to count keys now - countKeys() - } else { - metadata.numKeys + if (loadedVersion != latestSnapshotVersion) { + closeDB() + val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) + loadedVersion = latestSnapshotVersion + + // reset last snapshot version + if (lastSnapshotVersion > latestSnapshotVersion) { + // discard any newer snapshots + lastSnapshotVersion = 0L + latestSnapshot = None } + openDB() + + numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { + // we don't track the total number of rows - discard the number being track + -1L + } else if (metadata.numKeys < 0) { + // we track the total number of rows, but the snapshot doesn't have tracking number + // need to count keys now + countKeys() + } else { + metadata.numKeys + } + } if (loadedVersion != version) replayChangelog(version) // After changelog replay the numKeysOnWritingVersion will be updated to // the correct number of keys in the loaded version.