码迷,mamicode.com
首页 > 其他好文 > 详细

kafka offset 设置

时间:2017-05-18 09:46:37      阅读:328      评论:0      收藏:0      [点我收藏+]

标签:pre   fse   span   cts   test   tty   lang   one   ada   

http://stackoverflow.com/questions/36579815/kafka-python-how-do-i-commit-a-partition
from kafka import KafkaConsumer
from kafka import TopicPartition from kafka.structs import OffsetAndMetadata ... topic = ‘your_topic‘ partition = 0 tp = TopicPartition(topic,partition) kafkaConsumer = KafkaConsumer(config here...) kafkaConsumer.assign([tp]) offset = 15394125 kafkaConsumer.commit({ tp: OffsetAndMetadata(offset, None) })



meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset, meta)
consumer.commit(options)

如果consumer.commit()不可以,可以使用seek(),使用seek(),如果有多个partition,需要为每个partition都手动进行consumer assign.
topic_partition = TopicPartition("TOPIC_TEST", 1)
# 格式为topic, partition, 1表示partition 1.
consumer.assign([topic_partition])

consumer.seek(topic_partition, 1660000)
同样的代码,有些用commit可以,有些不可以,用seek()就可以。
 

kafka offset 设置

标签:pre   fse   span   cts   test   tty   lang   one   ada   

原文地址:http://www.cnblogs.com/buxizhizhoum/p/6871663.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!