@@ -76,14 +76,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
7676
7777 @ transient val serviceConsumer : Consumer [K , V ] = consumerStrategy.serviceConsumer
7878
79- def consumerForAssign (): KafkaConsumer [Long , String ] = this .synchronized {
80- val properties = consumerStrategy.executorKafkaParams
81- properties.put(" max.poll.records" , " 1" )
82- properties.put(ConsumerConfig .GROUP_ID_CONFIG ,
83- s " ${properties.get(ConsumerConfig .GROUP_ID_CONFIG )}_assignGroup " )
84- new KafkaConsumer [Long , String ](properties)
85- }
86-
8779 override def persist (newLevel : StorageLevel ): DStream [ConsumerRecord [K , V ]] = {
8880 logError(" Kafka ConsumerRecord is not serializable. " +
8981 " Use .map to extract fields before calling .persist or .window" )
@@ -288,26 +280,27 @@ private[spark] class DirectKafkaInputDStream[K, V](
288280
289281 override def start (): Unit = {
290282 val c = consumer
291- val consumerAssign = consumerForAssign
292283 val pollTimeout = ssc.sparkContext.getConf
293- .getLong(" spark.streaming.kafka.consumer.driver.poll.ms" , 120000 )
284+ .getLong(" spark.streaming.kafka.consumer.driver.poll.ms" , 5000 )
294285 paranoidPoll(c)
295286 if (currentOffsets.isEmpty) {
296287 currentOffsets = c.assignment().asScala.map { tp =>
297288 tp -> {
298289 val position = c.position(tp)
299290
300- consumerAssign .assign(ju.Arrays .asList(tp))
301- val records = consumerAssign .poll(pollTimeout).iterator()
291+ serviceConsumer .assign(ju.Arrays .asList(tp))
292+ val records = serviceConsumer .poll(pollTimeout).iterator()
302293 val firstRecordOffset = if (records.hasNext) {
303294 records.next().offset()
304295 } else {
305296 c.endOffsets(ju.Arrays .asList(tp)).get(tp).longValue()
306297 }
307298
308299 if (position < firstRecordOffset) {
300+ serviceConsumer.seek(tp, firstRecordOffset)
309301 firstRecordOffset
310302 } else {
303+ serviceConsumer.seek(tp, position)
311304 position
312305 }
313306 }
0 commit comments