码迷,mamicode.com
首页 > 编程语言 > 详细

python操作kafka

时间:2020-03-11 01:16:42      阅读:192      评论:0      收藏:0      [点我收藏+]

标签:record   topic   port   val   启动   get   local   py3   oca   

mac启动zookeeper和kafka:zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
#py2.7版本使用kafka,py3.7版本使用kafka-python
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=‘localhost:9092‘)
# send函数传递topic类型为str,value的类型是bytes
future = producer.send(‘hello‘, json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘)),
"info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘))


from kafka import KafkaConsumer
consumer = KafkaConsumer(‘hello‘, bootstrap_servers=[‘localhost:9092‘])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print( recv)

python操作kafka

标签:record   topic   port   val   启动   get   local   py3   oca   

原文地址:https://www.cnblogs.com/testerWangzhao/p/12459960.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!