@@ -23,6 +23,8 @@ import java.util.UUID
2323import scala .reflect .ClassTag
2424import scala .util .control .NonFatal
2525
26+ import org .apache .commons .io .FileUtils
27+
2628import org .apache .spark ._
2729import org .apache .spark .rdd .BlockRDD
2830import org .apache .spark .storage .{BlockId , StorageLevel }
@@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
3133/**
3234 * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD ]].
3335 * It contains information about the id of the blocks having this partition's data and
34- * the segment of the write ahead log that backs the partition.
36+ * the corresponding record handle in the write ahead log that backs the partition.
3537 * @param index index of the partition
3638 * @param blockId id of the block having the partition data
39+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
40+ * executors). If not, then block lookups by the block ids will be skipped.
41+ * By default, this is an empty array signifying true for all the blocks.
3742 * @param walRecordHandle Handle of the record in a write ahead log having the partition data
3843 */
3944private [streaming]
4045class WriteAheadLogBackedBlockRDDPartition (
4146 val index : Int ,
4247 val blockId : BlockId ,
43- val walRecordHandle : WriteAheadLogRecordHandle )
44- extends Partition
48+ val isBlockIdValid : Boolean ,
49+ val walRecordHandle : WriteAheadLogRecordHandle
50+ ) extends Partition
4551
4652
4753/**
4854 * This class represents a special case of the BlockRDD where the data blocks in
4955 * the block manager are also backed by data in write ahead logs. For reading
5056 * the data, this RDD first looks up the blocks by their ids in the block manager.
51- * If it does not find them, it looks up the corresponding data in the write ahead log.
57+ * If it does not find them, it looks up the WAL using the corresponding record handle.
58+ * The lookup of the blocks from the block manager can be skipped by setting the corresponding
59+ * element in isBlockIdValid to false. This is a performance optimization which does not affect
60+ * correctness, and it can be used in situations where it is known that the block
61+ * does not exist in the Spark executors (e.g. after a failed driver is restarted).
62+ *
5263 *
5364 * @param sc SparkContext
5465 * @param blockIds Ids of the blocks that contains this RDD's data
5566 * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
56- * @param storeInBlockManager Whether to store in the block manager after reading
57- * from the WAL record
67+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
68+ * executors). If not, then block lookups by the block ids will be skipped.
69+ * By default, this is an empty array signifying true for all the blocks.
70+ * @param storeInBlockManager Whether to store a block in the block manager
71+ * after reading it from the WAL
5872 * @param storageLevel storage level to store when storing in block manager
5973 * (applicable when storeInBlockManager = true)
6074 */
@@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
6377 @ transient sc : SparkContext ,
6478 @ transient blockIds : Array [BlockId ],
6579 @ transient walRecordHandles : Array [WriteAheadLogRecordHandle ],
66- storeInBlockManager : Boolean ,
67- storageLevel : StorageLevel )
80+ @ transient isBlockIdValid : Array [Boolean ] = Array .empty,
81+ storeInBlockManager : Boolean = false ,
82+ storageLevel : StorageLevel = StorageLevel .MEMORY_ONLY_SER )
6883 extends BlockRDD [T ](sc, blockIds) {
6984
7085 require(
7186 blockIds.length == walRecordHandles.length,
72- s " Number of block ids ( ${blockIds.length}) must be " +
73- s " the same as number of WAL record handles ( ${walRecordHandles.length}})! " )
87+ s " Number of block Ids ( ${blockIds.length}) must be " +
88+ s " same as number of WAL record handles ( ${walRecordHandles.length}}) " )
89+
90+ require(
91+ isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
92+ s " Number of elements in isBlockIdValid ( ${isBlockIdValid.length}) must be " +
93+ s " same as number of block Ids ( ${blockIds.length}) " )
7494
7595 // Hadoop configuration is not serializable, so broadcast it as a serializable.
7696 @ transient private val hadoopConfig = sc.hadoopConfiguration
7797 private val broadcastedHadoopConf = new SerializableWritable (hadoopConfig)
7898
99+ override def isValid (): Boolean = true
100+
79101 override def getPartitions : Array [Partition ] = {
80102 assertValid()
81- Array .tabulate(blockIds.size) { i =>
82- new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), walRecordHandles(i))
103+ Array .tabulate(blockIds.length) { i =>
104+ val isValid = if (isBlockIdValid.length == 0 ) true else isBlockIdValid(i)
105+ new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), isValid, walRecordHandles(i))
83106 }
84107 }
85108
@@ -94,51 +117,57 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
94117 val blockManager = SparkEnv .get.blockManager
95118 val partition = split.asInstanceOf [WriteAheadLogBackedBlockRDDPartition ]
96119 val blockId = partition.blockId
97- blockManager.get(blockId) match {
98- case Some (block) => // Data is in Block Manager
99- val iterator = block.data.asInstanceOf [Iterator [T ]]
100- logDebug(s " Read partition data of $this from block manager, block $blockId" )
101- iterator
102- case None => // Data not found in Block Manager, grab it from write ahead log file
103- var dataRead : ByteBuffer = null
104- var writeAheadLog : WriteAheadLog = null
105- try {
106- // The WriteAheadLogUtils.createLog*** method needs a directory to create a
107- // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
108- // writing log data. However, the directory is not needed if data needs to be read, hence
109- // a dummy path is provided to satisfy the method parameter requirements.
110- // FileBasedWriteAheadLog will not create any file or directory at that path.
111- // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
112- // this dummy directory should not already exist otherwise the WAL will try to recover
113- // past events from the directory and throw errors.
114- val nonExistentDirectory = new File (
115- System .getProperty(" java.io.tmpdir" ), UUID .randomUUID().toString).getAbsolutePath
116- writeAheadLog = WriteAheadLogUtils .createLogForReceiver(
117- SparkEnv .get.conf, nonExistentDirectory, hadoopConf)
118- dataRead = writeAheadLog.read(partition.walRecordHandle)
119- } catch {
120- case NonFatal (e) =>
121- throw new SparkException (
122- s " Could not read data from write ahead log record ${partition.walRecordHandle}" , e)
123- } finally {
124- if (writeAheadLog != null ) {
125- writeAheadLog.close()
126- writeAheadLog = null
127- }
128- }
129- if (dataRead == null ) {
120+
121+ def getBlockFromBlockManager (): Option [Iterator [T ]] = {
122+ blockManager.get(blockId).map(_.data.asInstanceOf [Iterator [T ]])
123+ }
124+
125+ def getBlockFromWriteAheadLog (): Iterator [T ] = {
126+ var dataRead : ByteBuffer = null
127+ var writeAheadLog : WriteAheadLog = null
128+ try {
129+ // The WriteAheadLogUtils.createLog*** method needs a directory to create a
130+ // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
131+ // writing log data. However, the directory is not needed if data needs to be read, hence
132+ // a dummy path is provided to satisfy the method parameter requirements.
133+ // FileBasedWriteAheadLog will not create any file or directory at that path.
134+ // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
135+ // this dummy directory should not already exist otherwise the WAL will try to recover
136+ // past events from the directory and throw errors.
137+ val nonExistentDirectory = new File (
138+ System .getProperty(" java.io.tmpdir" ), UUID .randomUUID().toString).getAbsolutePath
139+ writeAheadLog = WriteAheadLogUtils .createLogForReceiver(
140+ SparkEnv .get.conf, nonExistentDirectory, hadoopConf)
141+ dataRead = writeAheadLog.read(partition.walRecordHandle)
142+ } catch {
143+ case NonFatal (e) =>
130144 throw new SparkException (
131- s " Could not read data from write ahead log record ${partition.walRecordHandle}, " +
132- s " read returned null " )
145+ s " Could not read data from write ahead log record ${partition.walRecordHandle}" , e)
146+ } finally {
147+ if (writeAheadLog != null ) {
148+ writeAheadLog.close()
149+ writeAheadLog = null
133150 }
134- logInfo(s " Read partition data of $this from write ahead log, record handle " +
135- partition.walRecordHandle)
136- if (storeInBlockManager) {
137- blockManager.putBytes(blockId, dataRead, storageLevel)
138- logDebug(s " Stored partition data of $this into block manager with level $storageLevel" )
139- dataRead.rewind()
140- }
141- blockManager.dataDeserialize(blockId, dataRead).asInstanceOf [Iterator [T ]]
151+ }
152+ if (dataRead == null ) {
153+ throw new SparkException (
154+ s " Could not read data from write ahead log record ${partition.walRecordHandle}, " +
155+ s " read returned null " )
156+ }
157+ logInfo(s " Read partition data of $this from write ahead log, record handle " +
158+ partition.walRecordHandle)
159+ if (storeInBlockManager) {
160+ blockManager.putBytes(blockId, dataRead, storageLevel)
161+ logDebug(s " Stored partition data of $this into block manager with level $storageLevel" )
162+ dataRead.rewind()
163+ }
164+ blockManager.dataDeserialize(blockId, dataRead).asInstanceOf [Iterator [T ]]
165+ }
166+
167+ if (partition.isBlockIdValid) {
168+ getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
169+ } else {
170+ getBlockFromWriteAheadLog()
142171 }
143172 }
144173
@@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
149178 */
150179 override def getPreferredLocations (split : Partition ): Seq [String ] = {
151180 val partition = split.asInstanceOf [WriteAheadLogBackedBlockRDDPartition ]
152- val blockLocations = getBlockIdLocations().get(partition.blockId)
181+ val blockLocations = if (partition.isBlockIdValid) {
182+ getBlockIdLocations().get(partition.blockId)
183+ } else {
184+ None
185+ }
186+
153187 blockLocations.getOrElse {
154188 partition.walRecordHandle match {
155189 case fileSegment : FileBasedWriteAheadLogSegment =>
156- HdfsUtils .getFileSegmentLocations(
157- fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
190+ try {
191+ HdfsUtils .getFileSegmentLocations(
192+ fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
193+ } catch {
194+ case NonFatal (e) =>
195+ logError(" Error getting WAL file segment locations" , e)
196+ Seq .empty
197+ }
158198 case _ =>
159199 Seq .empty
160200 }
0 commit comments