Skip to content

Commit 130ec21

Browse files
committed
[SPARK-7788] Made KinesisReceiver.onStart() non-blocking
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes #6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking (cherry picked from commit 1c388a9) Signed-off-by: Tathagata Das <[email protected]>
1 parent 0be6e3b commit 130ec21

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis
1818

1919
import java.util.UUID
2020

21+
import scala.util.control.NonFatal
22+
2123
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
2224
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
2325
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
@@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver(
98100
*/
99101
private var worker: Worker = null
100102

103+
/** Thread running the worker */
104+
private var workerThread: Thread = null
105+
101106
/**
102107
* This is called when the KinesisReceiver starts and must be non-blocking.
103108
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver(
126131
}
127132

128133
worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
129-
worker.run()
130-
134+
workerThread = new Thread() {
135+
override def run(): Unit = {
136+
try {
137+
worker.run()
138+
} catch {
139+
case NonFatal(e) =>
140+
restart("Error running the KCL worker in Receiver", e)
141+
}
142+
}
143+
}
144+
workerThread.setName(s"Kinesis Receiver ${streamId}")
145+
workerThread.setDaemon(true)
146+
workerThread.start()
131147
logInfo(s"Started receiver with workerId $workerId")
132148
}
133149

@@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver(
137153
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
138154
*/
139155
override def onStop() {
140-
if (worker != null) {
141-
worker.shutdown()
156+
if (workerThread != null) {
157+
if (worker != null) {
158+
worker.shutdown()
159+
worker = null
160+
}
161+
workerThread.join()
162+
workerThread = null
142163
logInfo(s"Stopped receiver for workerId $workerId")
143-
worker = null
144164
}
145165
workerId = null
146166
}

0 commit comments

Comments
 (0)