Skip to content

Commit ad4522a

Browse files
zsxwingjeanlyn
authored andcommitted
[SPARK-7777][Streaming] Handle the case when there is no block in a batch
In the old implementation, if a batch has no block, `areWALRecordHandlesPresent` will be `true` and it will return `WriteAheadLogBackedBlockRDD`. This PR handles this case by returning `WriteAheadLogBackedBlockRDD` or `BlockRDD` according to the configuration. Author: zsxwing <[email protected]> Closes apache#6372 from zsxwing/SPARK-7777 and squashes the following commits: 788f895 [zsxwing] Handle the case when there is no block in a batch
1 parent e8547a3 commit ad4522a

File tree

2 files changed

+60
-18
lines changed

2 files changed

+60
-18
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,27 +73,38 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
7373
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
7474
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
7575

76-
// Are WAL record handles present with all the blocks
77-
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
76+
if (blockInfos.nonEmpty) {
77+
// Are WAL record handles present with all the blocks
78+
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
7879

79-
if (areWALRecordHandlesPresent) {
80-
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
81-
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
82-
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
83-
new WriteAheadLogBackedBlockRDD[T](
84-
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
85-
} else {
86-
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
87-
// then that is unexpected and log a warning accordingly.
88-
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
89-
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
90-
logError("Some blocks do not have Write Ahead Log information; " +
91-
"this is unexpected and data may not be recoverable after driver failures")
92-
} else {
93-
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
80+
if (areWALRecordHandlesPresent) {
81+
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
82+
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
83+
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
84+
new WriteAheadLogBackedBlockRDD[T](
85+
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
86+
} else {
87+
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
88+
// others then that is unexpected and log a warning accordingly.
89+
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
90+
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
91+
logError("Some blocks do not have Write Ahead Log information; " +
92+
"this is unexpected and data may not be recoverable after driver failures")
93+
} else {
94+
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
95+
}
9496
}
97+
new BlockRDD[T](ssc.sc, blockIds)
98+
}
99+
} else {
100+
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
101+
// according to the configuration
102+
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
103+
new WriteAheadLogBackedBlockRDD[T](
104+
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
105+
} else {
106+
new BlockRDD[T](ssc.sc, Array.empty)
95107
}
96-
new BlockRDD[T](ssc.sc, blockIds)
97108
}
98109
}
99110
}

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
3939
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
4040
import org.apache.spark.util.{ManualClock, Utils}
4141
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
42+
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
4243
import org.apache.spark.streaming.receiver.Receiver
4344

4445
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -105,6 +106,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
105106
}
106107
}
107108

109+
test("socket input stream - no block in a batch") {
110+
withTestServer(new TestServer()) { testServer =>
111+
testServer.start()
112+
113+
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
114+
ssc.addStreamingListener(ssc.progressListener)
115+
116+
val batchCounter = new BatchCounter(ssc)
117+
val networkStream = ssc.socketTextStream(
118+
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
119+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
120+
val outputStream = new TestOutputStream(networkStream, outputBuffer)
121+
outputStream.register()
122+
ssc.start()
123+
124+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
125+
clock.advance(batchDuration.milliseconds)
126+
127+
// Make sure the first batch is finished
128+
if (!batchCounter.waitUntilBatchesCompleted(1, 30000)) {
129+
fail("Timeout: cannot finish all batches in 30 seconds")
130+
}
131+
132+
networkStream.generatedRDDs.foreach { case (_, rdd) =>
133+
assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
134+
}
135+
}
136+
}
137+
}
138+
108139
test("binary records stream") {
109140
val testDir: File = null
110141
try {

0 commit comments

Comments
 (0)