Skip to content

Commit 847cfa8

Browse files
committed
Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
1 parent 0349b5b commit 847cfa8

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ object CheckpointReader extends Logging {
319319

320320
// Try to read the checkpoint files in the order
321321
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
322-
val compressionCodec = CompressionCodec.createCodec(conf)
322+
var readError: Exception = null
323323
checkpointFiles.foreach(file => {
324324
logInfo("Attempting to load checkpoint from file " + file)
325325
try {
@@ -330,13 +330,15 @@ object CheckpointReader extends Logging {
330330
return Some(cp)
331331
} catch {
332332
case e: Exception =>
333+
readError = e
333334
logWarning("Error reading checkpoint from file " + file, e)
334335
}
335336
})
336337

337338
// If none of checkpoint files could be read, then throw exception
338339
if (!ignoreReadError) {
339-
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
340+
throw new SparkException(
341+
s"Failed to read checkpoint from directory $checkpointPath", readError)
340342
}
341343
None
342344
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import java.io.{NotSerializableException, ObjectOutputStream}
20+
import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.collection.mutable.{ArrayBuffer, Queue}
2323
import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
3737

3838
override def stop() { }
3939

40+
private def readObject(in: ObjectInputStream): Unit = {
41+
throw new NotSerializableException("queueStream doesn't support checkpointing. " +
42+
"Please don't use queueStream when checkpointing is enabled.")
43+
}
44+
4045
private def writeObject(oos: ObjectOutputStream): Unit = {
41-
throw new NotSerializableException("queueStream doesn't support checkpointing")
46+
logWarning("queueStream doesn't support checkpointing")
4247
}
4348

4449
override def compute(validTime: Time): Option[RDD[T]] = {

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
3030
import org.scalatest.exceptions.TestFailedDueToTimeoutException
3131
import org.scalatest.time.SpanSugar._
3232

33-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
33+
import org.apache.spark._
3434
import org.apache.spark.metrics.MetricsSystem
3535
import org.apache.spark.metrics.source.Source
3636
import org.apache.spark.storage.StorageLevel
@@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
726726
}
727727

728728
test("queueStream doesn't support checkpointing") {
729-
val checkpointDir = Utils.createTempDir()
730-
ssc = new StreamingContext(master, appName, batchDuration)
731-
val rdd = ssc.sparkContext.parallelize(1 to 10)
732-
ssc.queueStream[Int](Queue(rdd)).print()
733-
ssc.checkpoint(checkpointDir.getAbsolutePath)
734-
val e = intercept[NotSerializableException] {
735-
ssc.start()
729+
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
730+
def creatingFunction(): StreamingContext = {
731+
val _ssc = new StreamingContext(conf, batchDuration)
732+
val rdd = _ssc.sparkContext.parallelize(1 to 10)
733+
_ssc.checkpoint(checkpointDirectory)
734+
_ssc.queueStream[Int](Queue(rdd)).register()
735+
_ssc
736+
}
737+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
738+
ssc.start()
739+
eventually(timeout(10000 millis)) {
740+
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
741+
}
742+
ssc.stop()
743+
val e = intercept[SparkException] {
744+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
736745
}
737746
// StreamingContext.validate changes the message, so use "contains" here
738-
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
747+
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
748+
"Please don't use queueStream when checkpointing is enabled."))
739749
}
740750

741751
def addInputStream(s: StreamingContext): DStream[Int] = {

0 commit comments

Comments
 (0)