Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam

object RecoverableNetworkWordCount {

def createContext(ip: String, port: Int, outputPath: String) = {
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
Expand All @@ -79,6 +79,7 @@ object RecoverableNetworkWordCount {
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkpointDirectory)

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
Expand Down Expand Up @@ -114,7 +115,7 @@ object RecoverableNetworkWordCount {
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath)
createContext(ip, port, outputPath, checkpointDirectory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas Can I double-check that it's correct to call StreamingContext.checkpoint only within the "create context" function? as opposed to always calling it on the result of StreamingContext.getOrCreate? That is, if it reads checkpoint data, it already configures itself to continue using that checkpoint directory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess things like this should be covered in some unit test. Will add later.

})
ssc.start()
ssc.awaitTermination()
Expand Down