Skip to content

Commit f6d56d2

Browse files
committed
[SPARK-21596][SS] Ensure places calling HDFSMetadataLog.get check the return value
Same PR as #18799 but for branch 2.2. Main discussion the other PR. -------- When I was investigating a flaky test, I realized that many places don't check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a batch is supposed to be there, the caller just ignores None rather than throwing an error. If some bug causes a query doesn't generate a batch metadata file, this behavior will hide it and allow the query continuing to run and finally delete metadata logs and make it hard to debug. This PR ensures that places calling HDFSMetadataLog.get always check the return value. Jenkins Author: Shixiong Zhu <[email protected]> Closes #18890 from tdas/SPARK-21596-2.2.
1 parent 7446be3 commit f6d56d2

File tree

6 files changed

+104
-17
lines changed

6 files changed

+104
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,15 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
169169
*/
170170
private def compact(batchId: Long, logs: Array[T]): Boolean = {
171171
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
172-
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
173-
if (super.add(batchId, compactLogs(allLogs).toArray)) {
174-
true
175-
} else {
176-
// Return false as there is another writer.
177-
false
178-
}
172+
val allLogs = validBatches.map { id =>
173+
super.get(id).getOrElse {
174+
throw new IllegalStateException(
175+
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
176+
s"(compactInterval: $compactInterval)")
177+
}
178+
}.flatten ++ logs
179+
// Return false as there is another writer.
180+
super.add(batchId, compactLogs(allLogs).toArray)
179181
}
180182

181183
/**
@@ -190,7 +192,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
190192
if (latestId >= 0) {
191193
try {
192194
val logs =
193-
getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten
195+
getAllValidBatches(latestId, compactInterval).map { id =>
196+
super.get(id).getOrElse {
197+
throw new IllegalStateException(
198+
s"${batchIdToPath(id)} doesn't exist " +
199+
s"(latestId: $latestId, compactInterval: $compactInterval)")
200+
}
201+
}.flatten
194202
return compactLogs(logs).toArray
195203
} catch {
196204
case e: IOException =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ class FileStreamSourceLog(
115115
Map.empty[Long, Option[Array[FileEntry]]]
116116
}
117117

118-
(existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1)
118+
val batches =
119+
(existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1)
120+
HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
121+
batches
119122
}
120123
}
121124

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
123123
serialize(metadata, output)
124124
return Some(tempPath)
125125
} finally {
126-
IOUtils.closeQuietly(output)
126+
output.close()
127127
}
128128
} catch {
129129
case e: FileAlreadyExistsException =>
@@ -211,13 +211,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
211211
}
212212

213213
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
214+
assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get)
214215
val files = fileManager.list(metadataPath, batchFilesFilter)
215216
val batchIds = files
216217
.map(f => pathToBatchId(f.getPath))
217218
.filter { batchId =>
218219
(endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
219-
}
220-
batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
220+
}.sorted
221+
222+
verifyBatchIds(batchIds, startId, endId)
223+
224+
batchIds.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
221225
case (batchId, metadataOption) =>
222226
(batchId, metadataOption.get)
223227
}
@@ -437,4 +441,51 @@ object HDFSMetadataLog {
437441
}
438442
}
439443
}
444+
445+
/**
446+
* Verify if batchIds are continuous and between `startId` and `endId`.
447+
*
448+
* @param batchIds the sorted ids to verify.
449+
* @param startId the start id. If it's set, batchIds should start with this id.
450+
* @param endId the start id. If it's set, batchIds should end with this id.
451+
*/
452+
def verifyBatchIds(batchIds: Seq[Long], startId: Option[Long], endId: Option[Long]): Unit = {
453+
// Verify that we can get all batches between `startId` and `endId`.
454+
if (startId.isDefined || endId.isDefined) {
455+
if (batchIds.isEmpty) {
456+
throw new IllegalStateException(s"batch ${startId.orElse(endId).get} doesn't exist")
457+
}
458+
if (startId.isDefined) {
459+
val minBatchId = batchIds.head
460+
assert(minBatchId >= startId.get)
461+
if (minBatchId != startId.get) {
462+
val missingBatchIds = startId.get to minBatchId
463+
throw new IllegalStateException(
464+
s"batches (${missingBatchIds.mkString(", ")}) don't exist " +
465+
s"(startId: $startId, endId: $endId)")
466+
}
467+
}
468+
469+
if (endId.isDefined) {
470+
val maxBatchId = batchIds.last
471+
assert(maxBatchId <= endId.get)
472+
if (maxBatchId != endId.get) {
473+
val missingBatchIds = maxBatchId to endId.get
474+
throw new IllegalStateException(
475+
s"batches (${missingBatchIds.mkString(", ")}) don't exist " +
476+
s"(startId: $startId, endId: $endId)")
477+
}
478+
}
479+
}
480+
481+
if (batchIds.nonEmpty) {
482+
val minBatchId = batchIds.head
483+
val maxBatchId = batchIds.last
484+
val missingBatchIds = (minBatchId to maxBatchId).toSet -- batchIds
485+
if (missingBatchIds.nonEmpty) {
486+
throw new IllegalStateException(s"batches (${missingBatchIds.mkString(", ")}) " +
487+
s"don't exist (startId: $startId, endId: $endId)")
488+
}
489+
}
490+
}
440491
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,10 @@ class StreamExecution(
429429
availableOffsets = nextOffsets.toStreamProgress(sources)
430430
/* Initialize committed offsets to a committed batch, which at this
431431
* is the second latest batch id in the offset log. */
432-
offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
432+
if (latestBatchId != 0) {
433+
val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
434+
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
435+
}
433436
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
434437
}
435438

@@ -568,10 +571,14 @@ class StreamExecution(
568571

569572
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
570573
// sources to discard data from the previous batch.
571-
val prevBatchOff = offsetLog.get(currentBatchId - 1)
572-
if (prevBatchOff.isDefined) {
573-
prevBatchOff.get.toStreamProgress(sources).foreach {
574-
case (src, off) => src.commit(off)
574+
if (currentBatchId != 0) {
575+
val prevBatchOff = offsetLog.get(currentBatchId - 1)
576+
if (prevBatchOff.isDefined) {
577+
prevBatchOff.get.toStreamProgress(sources).foreach {
578+
case (src, off) => src.commit(off)
579+
}
580+
} else {
581+
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
575582
}
576583
}
577584

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,23 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
259259
fm.rename(path2, path3)
260260
}
261261
}
262+
263+
test("verifyBatchIds") {
264+
import HDFSMetadataLog.verifyBatchIds
265+
verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L))
266+
verifyBatchIds(Seq(1L), Some(1L), Some(1L))
267+
verifyBatchIds(Seq(1L, 2L, 3L), None, Some(3L))
268+
verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), None)
269+
verifyBatchIds(Seq(1L, 2L, 3L), None, None)
270+
271+
intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), None))
272+
intercept[IllegalStateException](verifyBatchIds(Seq(), None, Some(1L)))
273+
intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), Some(1L)))
274+
intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), None))
275+
intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, Some(5L)))
276+
intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L)))
277+
intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L)))
278+
}
262279
}
263280

264281
/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
13141314
val metadataLog =
13151315
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
13161316
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))
1317+
assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0))))
13171318

13181319
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil,
13191320
dir.getAbsolutePath, Map.empty)

0 commit comments

Comments
 (0)