11#!/usr/bin/env python
2- import threading , logging , time
3- import multiprocessing
2+ import threading , time
43
5- from kafka import KafkaConsumer , KafkaProducer
4+ from kafka import KafkaAdminClient , KafkaConsumer , KafkaProducer
5+ from kafka .admin import NewTopic
66
77
88class Producer (threading .Thread ):
99 def __init__ (self ):
1010 threading .Thread .__init__ (self )
1111 self .stop_event = threading .Event ()
12-
12+
1313 def stop (self ):
1414 self .stop_event .set ()
1515
@@ -23,14 +23,15 @@ def run(self):
2323
2424 producer .close ()
2525
26- class Consumer (multiprocessing .Process ):
26+
27+ class Consumer (threading .Thread ):
2728 def __init__ (self ):
28- multiprocessing . Process .__init__ (self )
29- self .stop_event = multiprocessing .Event ()
30-
29+ threading . Thread .__init__ (self )
30+ self .stop_event = threading .Event ()
31+
3132 def stop (self ):
3233 self .stop_event .set ()
33-
34+
3435 def run (self ):
3536 consumer = KafkaConsumer (bootstrap_servers = 'localhost:9092' ,
3637 auto_offset_reset = 'earliest' ,
@@ -44,29 +45,38 @@ def run(self):
4445 break
4546
4647 consumer .close ()
47-
48-
48+
49+
4950def main ():
51+ # Create 'my-topic' Kafka topic
52+ try :
53+ admin = KafkaAdminClient (bootstrap_servers = 'localhost:9092' )
54+
55+ topic = NewTopic (name = 'my-topic' ,
56+ num_partitions = 1 ,
57+ replication_factor = 1 )
58+ admin .create_topics ([topic ])
59+ except Exception :
60+ pass
61+
5062 tasks = [
5163 Producer (),
5264 Consumer ()
5365 ]
5466
67+ # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
5568 for t in tasks :
5669 t .start ()
5770
5871 time .sleep (10 )
59-
72+
73+ # Stop threads
6074 for task in tasks :
6175 task .stop ()
6276
6377 for task in tasks :
6478 task .join ()
65-
66-
79+
80+
6781if __name__ == "__main__" :
68- logging .basicConfig (
69- format = '%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s' ,
70- level = logging .INFO
71- )
7282 main ()
0 commit comments