Skip to content

Commit 637bc9c

Browse files
committed
Changed segment to handle
1 parent 466212c commit 637bc9c

File tree

3 files changed

+14
-13
lines changed

3 files changed

+14
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
7171
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
7272
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
7373

74-
// Is WAL segment info present with all the blocks
75-
val isWALSegmentInfoPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
74+
// Are WAL record handles present with all the blocks
75+
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
7676

77-
if (isWALSegmentInfoPresent) {
78-
// If all the blocks have WAL segment info, then create a WALBackedBlockRDD
77+
if (areWALRecordHandlesPresent) {
78+
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
7979
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
80-
val blockWALSegments = blockInfos.map { _.walRecordHandleOption.get }.toArray
80+
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
8181
new WriteAheadLogBackedBlockRDD[T](
82-
ssc.sparkContext, blockIds, blockWALSegments, isBlockIdValid)
82+
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
8383
} else {
8484
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
8585
// then that is unexpected and log a warning accordingly.

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.streaming.util._
3131
/**
3232
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
3333
* It contains information about the id of the blocks having this partition's data and
34-
* the segment of the write ahead log that backs the partition.
34+
* the corresponding record handle in the write ahead log that backs the partition.
3535
* @param index index of the partition
3636
* @param blockId id of the block having the partition data
3737
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
@@ -49,9 +49,9 @@ class WriteAheadLogBackedBlockRDDPartition(
4949
* This class represents a special case of the BlockRDD where the data blocks in
5050
* the block manager are also backed by data in write ahead logs. For reading
5151
* the data, this RDD first looks up the blocks by their ids in the block manager.
52-
* If it does not find them, it looks up the corresponding file segment. The finding
53-
* of the blocks by their ids can be skipped by setting the corresponding element in
54-
* isBlockIdValid to false. This is a performance optimization which does not affect
52+
* If it does not find them, it looks up the WAL using the corresponding record handle.
53+
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
54+
* element in isBlockIdValid to false. This is a performance optimization which does not affect
5555
* correctness, and it can be used in situations where it is known that the block
5656
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
5757
*
@@ -62,7 +62,8 @@ class WriteAheadLogBackedBlockRDDPartition(
6262
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
6363
* executors). If not, then block lookups by the block ids will be skipped.
6464
* By default, this is an empty array signifying true for all the blocks.
65-
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
65+
* @param storeInBlockManager Whether to store a block in the block manager
66+
* after reading it from the WAL
6667
* @param storageLevel storage level to store when storing in block manager
6768
* (applicable when storeInBlockManager = true)
6869
*/
@@ -79,7 +80,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
7980
require(
8081
blockIds.length == walRecordHandles.length,
8182
s"Number of block Ids (${blockIds.length}) must be " +
82-
s" same as number of segments (${walRecordHandles.length}})")
83+
s" same as number of WAL record handles (${walRecordHandles.length}})")
8384

8485
require(
8586
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,

streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class WriteAheadLogBackedBlockRDDSuite
100100
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
101101
}
102102

103-
// Generate write ahead log file segments
103+
// Generate write ahead log record handles
104104
val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
105105
generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
106106
blockIds.takeRight(numPartitionsInWAL))

0 commit comments

Comments
 (0)