1818package org .apache .spark .streaming .kafka
1919
2020import java .util .Properties
21- import java .util .concurrent .{ ConcurrentHashMap , Executors }
21+ import java .util .concurrent .ConcurrentHashMap
2222
2323import scala .collection .Map
2424import scala .collection .mutable
2525import scala .reflect .{classTag , ClassTag }
2626
2727import kafka .common .TopicAndPartition
28- import kafka .consumer .{Consumer , ConsumerConfig , ConsumerConnector }
28+ import kafka .consumer .{Consumer , ConsumerConfig , ConsumerConnector , KafkaStream }
2929import kafka .serializer .Decoder
3030import kafka .utils .{ZkUtils , ZKGroupTopicDirs , ZKStringSerializer , VerifiableProperties }
3131import org .I0Itec .zkclient .ZkClient
3232
3333import org .apache .spark .{SparkEnv , Logging }
3434import org .apache .spark .storage .{StreamBlockId , StorageLevel }
3535import org .apache .spark .streaming .receiver .{BlockGeneratorListener , BlockGenerator , Receiver }
36+ import org .apache .spark .util .Utils
3637
3738private [streaming]
3839class ReliableKafkaReceiver [
@@ -45,27 +46,33 @@ class ReliableKafkaReceiver[
4546 storageLevel : StorageLevel )
4647 extends Receiver [Any ](storageLevel) with Logging {
4748
48- /** High level consumer to connect to Kafka */
49+ /** High level consumer to connect to Kafka. */
4950 private var consumerConnector : ConsumerConnector = null
5051
51- /** zkClient to connect to Zookeeper to commit the offsets */
52+ /** zkClient to connect to Zookeeper to commit the offsets. */
5253 private var zkClient : ZkClient = null
5354
5455 private val groupId = kafkaParams(" group.id" )
5556
56- private lazy val env = SparkEnv .get
57+ private def conf () = SparkEnv .get.conf
5758
5859 private val AUTO_OFFSET_COMMIT = " auto.commit.enable"
5960
6061 /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
61- * synchronized block, so mutable HashMap will not meet concurrency issue */
62- private lazy val topicPartitionOffsetMap = new mutable.HashMap [TopicAndPartition , Long ]
62+ * synchronized block, so mutable HashMap will not meet concurrency issue.
63+ */
64+ private var topicPartitionOffsetMap : mutable.HashMap [TopicAndPartition , Long ] = null
6365
64- /** A concurrent HashMap to store the stream block id and related offset snapshot */
65- private lazy val blockOffsetMap =
66- new ConcurrentHashMap [StreamBlockId , Map [TopicAndPartition , Long ]]
66+ /** A concurrent HashMap to store the stream block id and related offset snapshot. */
67+ private var blockOffsetMap : ConcurrentHashMap [StreamBlockId , Map [TopicAndPartition , Long ]] = null
6768
68- private lazy val blockGeneratorListener = new BlockGeneratorListener {
69+ /** Manage the BlockGenerator in receiver itself for better managing block store and offset
70+ * commit.
71+ */
72+ private var blockGenerator : BlockGenerator = null
73+
74+ /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
75+ private final class OffsetCheckpointListener extends BlockGeneratorListener {
6976 override def onStoreData (data : Any , metadata : Any ): Unit = {
7077 if (metadata != null ) {
7178 val kafkaMetadata = metadata.asInstanceOf [(TopicAndPartition , Long )]
@@ -96,10 +103,6 @@ class ReliableKafkaReceiver[
96103 }
97104 }
98105
99- /** Manage the BlockGenerator in receiver itself for better managing block store and offset
100- * commit */
101- private var blockGenerator : BlockGenerator = null
102-
103106 override def onStop (): Unit = {
104107 if (consumerConnector != null ) {
105108 consumerConnector.shutdown()
@@ -111,13 +114,33 @@ class ReliableKafkaReceiver[
111114 zkClient = null
112115 }
113116
114- blockGenerator.stop()
117+ if (blockGenerator != null ) {
118+ blockGenerator.stop()
119+ blockGenerator = null
120+ }
121+
122+ if (topicPartitionOffsetMap != null ) {
123+ topicPartitionOffsetMap.clear()
124+ topicPartitionOffsetMap = null
125+ }
126+
127+ if (blockOffsetMap != null ) {
128+ blockOffsetMap.clear()
129+ blockOffsetMap = null
130+ }
115131 }
116132
117133 override def onStart (): Unit = {
118134 logInfo(s " Starting Kafka Consumer Stream with group: $groupId" )
119135
120- blockGenerator = new BlockGenerator (blockGeneratorListener, streamId, env.conf)
136+ // Initialize the topic-partition / offset hash map.
137+ topicPartitionOffsetMap = new mutable.HashMap [TopicAndPartition , Long ]
138+
139+ // Initialize the stream block id / offset snapshot hash map.
140+ blockOffsetMap = new ConcurrentHashMap [StreamBlockId , Map [TopicAndPartition , Long ]]()
141+
142+ // Initialize the block generator for storing Kafka message.
143+ blockGenerator = new BlockGenerator (new OffsetCheckpointListener , streamId, conf())
121144
122145 if (kafkaParams.contains(AUTO_OFFSET_COMMIT ) && kafkaParams(AUTO_OFFSET_COMMIT ) == " true" ) {
123146 logWarning(s " $AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
@@ -133,7 +156,7 @@ class ReliableKafkaReceiver[
133156
134157 val consumerConfig = new ConsumerConfig (props)
135158
136- assert(consumerConfig.autoCommitEnable == false )
159+ assert(! consumerConfig.autoCommitEnable)
137160
138161 logInfo(s " Connecting to Zookeeper: ${consumerConfig.zkConnect}" )
139162 consumerConnector = Consumer .create(consumerConfig)
@@ -156,41 +179,45 @@ class ReliableKafkaReceiver[
156179 val topicMessageStreams = consumerConnector.createMessageStreams(
157180 topics, keyDecoder, valueDecoder)
158181
159- val executorPool = Executors .newFixedThreadPool (topics.values.sum)
182+ val executorPool = Utils .newDaemonFixedThreadPool (topics.values.sum, " KafkaMessageHandler " )
160183
161184 try {
162185 topicMessageStreams.values.foreach { streams =>
163186 streams.foreach { stream =>
164- executorPool.submit(new Runnable {
165- override def run (): Unit = {
166- logInfo(s " Starting message process thread ${Thread .currentThread().getId}. " )
167- try {
168- for (msgAndMetadata <- stream) {
169- val topicAndPartition = TopicAndPartition (
170- msgAndMetadata.topic, msgAndMetadata.partition)
171- val metadata = (topicAndPartition, msgAndMetadata.offset)
172-
173- blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
174- }
175- } catch {
176- case e : Throwable => logError(" Error handling message; existing" , e)
177- }
178- }
179- })
187+ executorPool.submit(new MessageHandler (stream))
180188 }
181189 }
182190 } finally {
183191 executorPool.shutdown()
184192 }
185193 }
186194
195+ /** A inner class to handle received Kafka message. */
196+ private final class MessageHandler (stream : KafkaStream [K , V ]) extends Runnable {
197+ override def run (): Unit = {
198+ logInfo(s " Starting message process thread ${Thread .currentThread().getId}. " )
199+ try {
200+ for (msgAndMetadata <- stream) {
201+ val topicAndPartition = TopicAndPartition (
202+ msgAndMetadata.topic, msgAndMetadata.partition)
203+ val metadata = (topicAndPartition, msgAndMetadata.offset)
204+
205+ blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
206+ }
207+ } catch {
208+ case e : Throwable => logError(" Error handling message; existing" , e)
209+ }
210+ }
211+ }
212+
187213 /**
188214 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
189215 * metadata schema in Zookeeper.
190216 */
191217 private def commitOffset (offsetMap : Map [TopicAndPartition , Long ]): Unit = {
192218 if (zkClient == null ) {
193- logError(s " zkClient $zkClient should be initialized at started " )
219+ val thrown = new IllegalStateException (" Zookeeper client is unexpectedly null" )
220+ stop(" Zookeeper client is not initialized before commit offsets to ZK" , thrown)
194221 return
195222 }
196223
@@ -205,7 +232,7 @@ class ReliableKafkaReceiver[
205232 s " ${topicAndPart.topic}, partition ${topicAndPart.partition}" , t)
206233 }
207234
208- logInfo(s " Committed offset ${ offset} for topic ${topicAndPart.topic}, " +
235+ logInfo(s " Committed offset $offset for topic ${topicAndPart.topic}, " +
209236 s " partition ${topicAndPart.partition}" )
210237 }
211238 }
0 commit comments