Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis

import java.util.UUID

import scala.util.control.NonFatal

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
Expand Down Expand Up @@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver(
*/
private var worker: Worker = null

/** Thread running the worker */
private var workerThread: Thread = null

/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
Expand Down Expand Up @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver(
}

worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
worker.run()

workerThread = new Thread() {
override def run(): Unit = {
try {
worker.run()
} catch {
case NonFatal(e) =>
restart("Error running the KCL worker in Receiver", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing nothing about Kinesis. Just a small question: if we call worker.run twice, what will happen? I'm not sure if worker is reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not. And the ReceiverSupevisor guarnatees (assumption) that onStart() is not going to be called twice, so this wont be reused.

}
}
}
workerThread.setName(s"Kinesis Receiver ${streamId}")
workerThread.setDaemon(true)
workerThread.start()
logInfo(s"Started receiver with workerId $workerId")
}

Expand All @@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver(
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
*/
override def onStop() {
if (worker != null) {
worker.shutdown()
if (workerThread != null) {
if (worker != null) {
worker.shutdown()
worker = null
}
workerThread.join()
workerThread = null
logInfo(s"Stopped receiver for workerId $workerId")
worker = null
}
workerId = null
}
Expand Down