Skip to content

Commit 270bbe2

Browse files
committed
Fix Kafka Produce throw Exception issue
1 parent ba2b566 commit 270bbe2

File tree

1 file changed

+13
-11
lines changed

1 file changed

+13
-11
lines changed

examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark.examples.streaming
1919

20-
import java.util.Properties
20+
import java.util.HashMap
2121

22-
import kafka.producer._
22+
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
2323

2424
import org.apache.spark.streaming._
2525
import org.apache.spark.streaming.kafka._
@@ -77,23 +77,25 @@ object KafkaWordCountProducer {
7777
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
7878

7979
// Zookeeper connection properties
80-
val props = new Properties()
81-
props.put("metadata.broker.list", brokers)
82-
props.put("serializer.class", "kafka.serializer.StringEncoder")
80+
val props = new HashMap[String, Object]()
81+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
82+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
83+
"org.apache.kafka.common.serialization.StringSerializer")
84+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
85+
"org.apache.kafka.common.serialization.StringSerializer")
8386

84-
val config = new ProducerConfig(props)
85-
val producer = new Producer[String, String](config)
87+
val producer = new KafkaProducer[String, String](props)
8688

8789
// Send some messages
8890
while(true) {
89-
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
91+
(1 to messagesPerSec.toInt).foreach { messageNum =>
9092
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
9193
.mkString(" ")
9294

93-
new KeyedMessage[String, String](topic, str)
94-
}.toArray
95+
val message = new ProducerRecord[String, String](topic, null, str)
96+
producer.send(message)
97+
}
9598

96-
producer.send(messages: _*)
9799
Thread.sleep(100)
98100
}
99101
}

0 commit comments

Comments
 (0)