码迷,mamicode.com
首页 > 编程语言 > 详细

Kafka Consumer(Python threading)

时间:2017-07-31 01:06:29      阅读:316      评论:0      收藏:0      [点我收藏+]

标签:port   python   tar   int   ==   try   from   sel   append   

import threading
from kafka import KafkaConsumer

threads = []

class MyThread(threading.Thread):
def __init__(self, threadName, keyName):
threading.Thread.__init__(self)
self.threadName=threadName
self.keyName=keyName

def run(self):
receiveinfo(self.threadName, self.keyName)

def receiveinfo(threadName, keyName):
consumer = KafkaConsumer(‘test‘, bootstrap_servers=‘192.168.1.10:9092‘)
for msg in consumer:
if msg.key==keyName:
if msg.value=="exit": break
print("("+threadName+")"+ " " + msg.value)
try:
t1=MyThread("T1","Thread-1")
threads.append(t1)
t2=MyThread("T2","Thread-2")
threads.append(t2)
t3=MyThread("T3","Thread-3")
threads.append(t3)

for t in threads:
t.start()

for t in threads:
t.join()

print ("exit program with 0")
except:
print ("Error: failed to run consumer program")

Kafka Consumer(Python threading)

标签:port   python   tar   int   ==   try   from   sel   append   

原文地址:http://www.cnblogs.com/michaelying/p/7260890.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!