Skip to content

Commit c02c8be

Browse files
chaoqin-li1123HeartSaVioR
authored andcommitted
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider
### What changes were proposed in this pull request? In order to reduce the checkpoint duration and end to end latency, we propose Changelog Based Checkpointing for RocksDB State Store Provider. Below is the mechanism. 1. Changelog checkpoint: Upon each put() delete() call to local rocksdb instance, log the operation to a changelog file. During the state change commit, sync the compressed change log of the current batch to DFS as checkpointDir/{version}.delta. 2. Version reconstruction: For version j, find latest snapshot i.zip such that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in loading the initial state as well as creating the latest version snapshot. Note: If a query is shutdown without exception, there won’t be changelog replay during query restart because a maintenance task is executed before the state store instance is unloaded. 3. Background snapshot: A maintenance thread in executors will launch maintenance tasks periodically. Inside the maintenance task, sync the latest RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables faster failure recovery and allows old versions to be purged. 4. Garbage collection: Inside the maintenance task, delete snapshot and delta files from DFS for versions that is out of retention range(default retained version number is 100) ### Why are the changes needed? We have identified state checkpointing latency as one of the major performance bottlenecks for stateful streaming queries. Currently, RocksDB state store pauses the RocksDB instances to upload a snapshot to the cloud when committing a batch, which is heavy weight and has unpredictable performance. With changelog based checkpointing, we allow the RocksDB instance to run uninterruptibly, which improves RocksDB operation performance. This also dramatically reduces the commit time and batch duration because we are uploading a smaller amount of data during state commit. With this change, stateful query with RocksDB state store will have lower and more predictable latency. ### How was this patch tested? Add unit test for changelog checkpointing utility. Add unit test and integration test that check backward compatibility with existing checkpoint. Enable RocksDB state store unit test and stateful streaming query integration test to run with changelog checkpointing enabled. Closes #41099 from chaoqin-li1123/changelog. Authored-by: Chaoqin Li <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 10ee643 commit c02c8be

14 files changed

+1012
-201
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
23202320
<td>Whether we perform a range compaction of RocksDB instance for commit operation</td>
23212321
<td>False</td>
23222322
</tr>
2323+
<tr>
2324+
<td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td>
2325+
<td>Whether to upload changelog instead of snapshot during RocksDB StateStore commit</td>
2326+
<td>False</td>
2327+
</tr>
23232328
<tr>
23242329
<td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
23252330
<td>Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.</td>
@@ -2389,6 +2394,19 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
23892394
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
23902395
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.
23912396

2397+
##### RocksDB State Store Changelog Checkpointing
2398+
In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage.
2399+
Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability.
2400+
Snapshots are persisted periodically in the background for predictable failure recovery and changelog trimming.
2401+
Changelog checkpointing avoids cost of capturing and uploading snapshots of RocksDB instances and significantly reduce streaming query latency.
2402+
2403+
Changelog checkpointing is disabled by default. You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`.
2404+
Changelog checkpointing is designed to be backward compatible with traditional checkpointing mechanism.
2405+
RocksDB state store provider offers seamless support for transitioning between two checkpointing mechanisms in both directions. This allows you to leverage the performance benefits of changelog checkpointing without discarding the old state checkpoint.
2406+
In a version of spark that supports changelog checkpointing, you can migrate streaming queries from older versions of Spark to changelog checkpointing by enabling changelog checkpointing in the spark session.
2407+
Vice versa, you can disable changelog checkpointing safely in newer version of Spark, then any query that already run with changelog checkpointing will switch back to traditional checkpointing.
2408+
You would need to restart you streaming queries for change in checkpointing mechanism to be applied, but you won't observe any performance degrade in the process.
2409+
23922410
##### Performance-aspect considerations
23932411

23942412
1. You may want to disable the track of total number of rows to aim the better performance on RocksDB state store.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 149 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ class RocksDB(
5656
hadoopConf: Configuration = new Configuration,
5757
loggingId: String = "") extends Logging {
5858

59+
case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
60+
def close(): Unit = {
61+
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
62+
}
63+
}
64+
65+
@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
66+
@volatile private var lastSnapshotVersion = 0L
67+
5968
RocksDBLoader.loadLibrary()
6069

6170
// Java wrapper objects linking to native RocksDB objects
@@ -109,13 +118,15 @@ class RocksDB(
109118
private val nativeStats = dbOptions.statistics()
110119

111120
private val workingDir = createTempDir("workingDir")
112-
private val fileManager = new RocksDBFileManager(
113-
dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId)
121+
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),
122+
hadoopConf, conf.compressionCodec, loggingId = loggingId)
114123
private val byteArrayPair = new ByteArrayPair()
115124
private val commitLatencyMs = new mutable.HashMap[String, Long]()
116125
private val acquireLock = new Object
117126

118127
@volatile private var db: NativeRocksDB = _
128+
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
129+
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
119130
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
120131
@volatile private var numKeysOnLoadedVersion = 0L
121132
@volatile private var numKeysOnWritingVersion = 0L
@@ -129,17 +140,20 @@ class RocksDB(
129140
* Note that this will copy all the necessary file from DFS to local disk as needed,
130141
* and possibly restart the native RocksDB instance.
131142
*/
132-
def load(version: Long): RocksDB = {
143+
def load(version: Long, readOnly: Boolean = false): RocksDB = {
133144
assert(version >= 0)
134145
acquire()
135146
logInfo(s"Loading $version")
136147
try {
137148
if (loadedVersion != version) {
138149
closeDB()
139-
val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
150+
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
151+
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
152+
loadedVersion = latestSnapshotVersion
153+
140154
openDB()
141155

142-
val numKeys = if (!conf.trackTotalNumberOfRows) {
156+
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
143157
// we don't track the total number of rows - discard the number being track
144158
-1L
145159
} else if (metadata.numKeys < 0) {
@@ -149,10 +163,10 @@ class RocksDB(
149163
} else {
150164
metadata.numKeys
151165
}
152-
numKeysOnWritingVersion = numKeys
153-
numKeysOnLoadedVersion = numKeys
154-
155-
loadedVersion = version
166+
if (loadedVersion != version) replayChangelog(version)
167+
// After changelog replay the numKeysOnWritingVersion will be updated to
168+
// the correct number of keys in the loaded version.
169+
numKeysOnLoadedVersion = numKeysOnWritingVersion
156170
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
157171
}
158172
if (conf.resetStatsOnLoad) {
@@ -164,9 +178,36 @@ class RocksDB(
164178
loadedVersion = -1 // invalidate loaded data
165179
throw t
166180
}
181+
if (enableChangelogCheckpointing && !readOnly) {
182+
// Make sure we don't leak resource.
183+
changelogWriter.foreach(_.abort())
184+
changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
185+
}
167186
this
168187
}
169188

189+
/**
190+
* Replay change log from the loaded version to the target version.
191+
*/
192+
private def replayChangelog(endVersion: Long): Unit = {
193+
for (v <- loadedVersion + 1 to endVersion) {
194+
var changelogReader: StateStoreChangelogReader = null
195+
try {
196+
changelogReader = fileManager.getChangelogReader(v)
197+
changelogReader.foreach { case (key, value) =>
198+
if (value != null) {
199+
put(key, value)
200+
} else {
201+
remove(key)
202+
}
203+
}
204+
} finally {
205+
if (changelogReader != null) changelogReader.close()
206+
}
207+
}
208+
loadedVersion = endVersion
209+
}
210+
170211
/**
171212
* Get the value for the given key if present, or null.
172213
* @note This will return the last written value even if it was uncommitted.
@@ -187,6 +228,7 @@ class RocksDB(
187228
}
188229
}
189230
db.put(writeOptions, key, value)
231+
changelogWriter.foreach(_.put(key, value))
190232
}
191233

192234
/**
@@ -201,6 +243,7 @@ class RocksDB(
201243
}
202244
}
203245
db.delete(writeOptions, key)
246+
changelogWriter.foreach(_.delete(key))
204247
}
205248

206249
/**
@@ -286,44 +329,66 @@ class RocksDB(
286329
*/
287330
def commit(): Long = {
288331
val newVersion = loadedVersion + 1
289-
val checkpointDir = createTempDir("checkpoint")
290-
var rocksDBBackgroundThreadPaused = false
291332
try {
292-
// Make sure the directory does not exist. Native RocksDB fails if the directory to
293-
// checkpoint exists.
294-
Utils.deleteRecursively(checkpointDir)
295333

296334
logInfo(s"Flushing updates for $newVersion")
297-
val flushTimeMs = timeTakenMs { db.flush(flushOptions) }
298335

299-
val compactTimeMs = if (conf.compactOnCommit) {
300-
logInfo("Compacting")
301-
timeTakenMs { db.compactRange() }
302-
} else 0
303-
304-
logInfo("Pausing background work")
305-
val pauseTimeMs = timeTakenMs {
306-
db.pauseBackgroundWork() // To avoid files being changed while committing
307-
rocksDBBackgroundThreadPaused = true
308-
}
309-
310-
logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
311-
val checkpointTimeMs = timeTakenMs {
312-
val cp = Checkpoint.create(db)
313-
cp.createCheckpoint(checkpointDir.toString)
336+
var compactTimeMs = 0L
337+
var flushTimeMs = 0L
338+
var checkpointTimeMs = 0L
339+
if (shouldCreateSnapshot()) {
340+
// Need to flush the change to disk before creating a checkpoint
341+
// because rocksdb wal is disabled.
342+
logInfo(s"Flushing updates for $newVersion")
343+
flushTimeMs = timeTakenMs { db.flush(flushOptions) }
344+
if (conf.compactOnCommit) {
345+
logInfo("Compacting")
346+
compactTimeMs = timeTakenMs { db.compactRange() }
347+
}
348+
checkpointTimeMs = timeTakenMs {
349+
val checkpointDir = createTempDir("checkpoint")
350+
logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
351+
// Make sure the directory does not exist. Native RocksDB fails if the directory to
352+
// checkpoint exists.
353+
Utils.deleteRecursively(checkpointDir)
354+
// We no longer pause background operation before creating a RocksDB checkpoint because
355+
// it is unnecessary. The captured snapshot will still be consistent with ongoing
356+
// background operations.
357+
val cp = Checkpoint.create(db)
358+
cp.createCheckpoint(checkpointDir.toString)
359+
synchronized {
360+
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
361+
// inside the uploadSnapshot() called below.
362+
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
363+
// during state store maintenance.
364+
latestSnapshot.foreach(_.close())
365+
latestSnapshot = Some(
366+
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
367+
lastSnapshotVersion = newVersion
368+
}
369+
}
314370
}
315371

316372
logInfo(s"Syncing checkpoint for $newVersion to DFS")
317373
val fileSyncTimeMs = timeTakenMs {
318-
fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion)
374+
if (enableChangelogCheckpointing) {
375+
try {
376+
assert(changelogWriter.isDefined)
377+
changelogWriter.foreach(_.commit())
378+
} finally {
379+
changelogWriter = None
380+
}
381+
} else {
382+
assert(changelogWriter.isEmpty)
383+
uploadSnapshot()
384+
}
319385
}
386+
320387
numKeysOnLoadedVersion = numKeysOnWritingVersion
321388
loadedVersion = newVersion
322-
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
323389
commitLatencyMs ++= Map(
324390
"flush" -> flushTimeMs,
325391
"compact" -> compactTimeMs,
326-
"pause" -> pauseTimeMs,
327392
"checkpoint" -> checkpointTimeMs,
328393
"fileSync" -> fileSyncTimeMs
329394
)
@@ -334,25 +399,60 @@ class RocksDB(
334399
loadedVersion = -1 // invalidate loaded version
335400
throw t
336401
} finally {
337-
if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
338-
silentDeleteRecursively(checkpointDir, s"committing $newVersion")
339402
// reset resources as either 1) we already pushed the changes and it has been committed or
340403
// 2) commit has failed and the current version is "invalidated".
341404
release()
342405
}
343406
}
344407

408+
private def shouldCreateSnapshot(): Boolean = {
409+
if (enableChangelogCheckpointing) {
410+
assert(changelogWriter.isDefined)
411+
val newVersion = loadedVersion + 1
412+
newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot ||
413+
changelogWriter.get.size > 10000
414+
} else true
415+
}
416+
417+
private def uploadSnapshot(): Unit = {
418+
val localCheckpoint = synchronized {
419+
val checkpoint = latestSnapshot
420+
latestSnapshot = None
421+
checkpoint
422+
}
423+
localCheckpoint match {
424+
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
425+
try {
426+
val uploadTime = timeTakenMs {
427+
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
428+
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
429+
}
430+
logInfo(s"$loggingId: Upload snapshot of version $version," +
431+
s" time taken: $uploadTime ms")
432+
} finally {
433+
localCheckpoint.foreach(_.close())
434+
}
435+
case _ =>
436+
}
437+
}
438+
345439
/**
346440
* Drop uncommitted changes, and roll back to previous version.
347441
*/
348442
def rollback(): Unit = {
349443
numKeysOnWritingVersion = numKeysOnLoadedVersion
350444
loadedVersion = -1L
445+
changelogWriter.foreach(_.abort())
446+
// Make sure changelogWriter gets recreated next time.
447+
changelogWriter = None
351448
release()
352449
logInfo(s"Rolled back to $loadedVersion")
353450
}
354451

355-
def cleanup(): Unit = {
452+
def doMaintenance(): Unit = {
453+
if (enableChangelogCheckpointing) {
454+
uploadSnapshot()
455+
}
356456
val cleanupTime = timeTakenMs {
357457
fileManager.deleteOldVersions(conf.minVersionsToRetain)
358458
}
@@ -369,6 +469,9 @@ class RocksDB(
369469
flushOptions.close()
370470
dbOptions.close()
371471
dbLogger.close()
472+
synchronized {
473+
latestSnapshot.foreach(_.close())
474+
}
372475
silentDeleteRecursively(localRootDir, "closing RocksDB")
373476
} catch {
374477
case e: Exception =>
@@ -550,7 +653,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
550653
*/
551654
case class RocksDBConf(
552655
minVersionsToRetain: Int,
656+
minDeltasForSnapshot: Int,
553657
compactOnCommit: Boolean,
658+
enableChangelogCheckpointing: Boolean,
554659
blockSizeKB: Long,
555660
blockCacheSizeMB: Long,
556661
lockAcquireTimeoutMs: Long,
@@ -563,7 +668,8 @@ case class RocksDBConf(
563668
boundedMemoryUsage: Boolean,
564669
totalMemoryUsageMB: Long,
565670
writeBufferCacheRatio: Double,
566-
highPriorityPoolRatio: Double)
671+
highPriorityPoolRatio: Double,
672+
compressionCodec: String)
567673

568674
object RocksDBConf {
569675
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -585,6 +691,8 @@ object RocksDBConf {
585691

586692
// Configuration that specifies whether to compact the RocksDB data every time data is committed
587693
private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false")
694+
private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry(
695+
"changelogCheckpointing.enabled", "false")
588696
private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4")
589697
private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8")
590698
// See SPARK-42794 for details.
@@ -705,7 +813,9 @@ object RocksDBConf {
705813

706814
RocksDBConf(
707815
storeConf.minVersionsToRetain,
816+
storeConf.minDeltasForSnapshot,
708817
getBooleanConf(COMPACT_ON_COMMIT_CONF),
818+
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
709819
getPositiveLongConf(BLOCK_SIZE_KB_CONF),
710820
getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
711821
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
@@ -718,7 +828,8 @@ object RocksDBConf {
718828
getBooleanConf(BOUNDED_MEMORY_USAGE_CONF),
719829
getLongConf(MAX_MEMORY_USAGE_MB_CONF),
720830
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
721-
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF))
831+
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
832+
storeConf.compressionCodec)
722833
}
723834

724835
def apply(): RocksDBConf = apply(new StateStoreConf())

0 commit comments

Comments
 (0)