-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7396][Streaming][Example] Update KafkaWordCountProducer to use new Producer API #5936
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Merged build triggered. |
Merged build started. |
Test build #31965 has started for PR 5936 at commit |
Test build #31965 timed out for PR 5936 at commit |
Merged build finished. Test FAILed. |
Test FAILed. |
CC @koeninger |
Jenkins, retest this please |
Merged build triggered. |
Merged build started. |
Test build #31972 has started for PR 5936 at commit |
Test build #31972 has finished for PR 5936 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
Not that there's necessarily anything wrong with updating the producer api being used... KafkaWordCountProducer uses no spark code. It only uses kafka.producer._ (which still exists in the 0.8.2.1 artifact), and java.util.properties. I just ran the old version of KafkaWordCountProducer against a local kafka install with no problems. @jerryshao how were you getting that exception? |
FWIW I ran the new version, and it works as well, I'm just concerned about how that exception was caused. |
Hi @koeninger , I just run this example with no additional configuration for the first time, I met such exception when the example is started. But again I run it, there's no such exception, instead I met some of warning logs like:
I guess this was introduced by the upgrading of Kafka version, since I didn't meet this before. So I rewrite this example with new Producer API, and this warning log is gone. |
Anyway I think it's meaningful to update this example accordingly. |
LGTM. Merging this. Thanks! |
…se new Producer API Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao <[email protected]> Closes #5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue (cherry picked from commit 316a5c0) Signed-off-by: Tathagata Das <[email protected]>
…se new Producer API Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao <[email protected]> Closes apache#5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue
…se new Producer API Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao <[email protected]> Closes apache#5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue
…se new Producer API Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao <[email protected]> Closes apache#5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue
Otherwise it will throw exception: