码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka备忘

时间:2015-11-11 16:13:58      阅读:332      评论:0      收藏:0      [点我收藏+]

标签:

 

 

 

官网 http://kafka.apache.org/

技术分享

多生产者多消费者

技术分享

多topic和多分区

技术分享

多消费者组。每组中消息不能重复消费,组间不影响

 

 

启动

RunKafka(){
    cd $kafka_home
    nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> zk$dayStr.log  & 
    echo Starting zookeeper...
    sleep 5s #wait a monment until zookeeper is ready

    nohup ./bin/kafka-server-start.sh ./config/server.properties  >> kafka$dayStr.log & 
    echo Starting kafka-server...
}

流程:启动zookeeper -> 启动kafka-server -> 创建topic -> 创建生产者 -> 创建消费者

 

基本命令

# kafka basic common
# ./app/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic huashi
# ./app/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
# ./app/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic huashi
# ./app/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic huashi

# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huashi
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic huashi --from-beginning
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic huashi --partitions 4

# ==================delete consumer group======
# ~/kafka/bin/zookeeper-shell.sh localhost:2181
# ls /consumers
# rmr /consumers/bd/offsets
# rmr /consumers/test/offsets/huashi20151108


# ls /tmp/kafka-logs/

 

 

 

Python包

https://github.com/mumrah/kafka-python

用法:http://kafka-python.readthedocs.org/en/latest/usage.html 

 

 

 

FakeProducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

__author__ = manhua

from kafka import SimpleProducer, KafkaClient
import time
# To send messages synchronously
kafka = KafkaClient(localhost:9092)
producer = SimpleProducer(kafka)
n=0
while True:
    producer.send_messages(bhuashi, str(n))
    n+=1
    time.sleep(1)
    print n

 

 

ConsumerTest.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

__author__ = manhua

from kafka import KafkaConsumer
import sys


class UnzipConsumer:
    def __init__(self, topic, partition_id, gid=bd, bs_server=localhost:9092):
        self.partition_id = partition_id
        self.consumer = KafkaConsumer((topic, int(partition_id)),  # must specify an id, or it will quite slow
                                      group_id=gid,
                                      bootstrap_servers=[bs_server],
                                      auto_offset_reset=smallest  # ,
                                      # consumer_timeout_ms=1000*60*30
                                      )

    def run(self):
        for message in self.consumer:
            print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
            self.consumer.task_done(message)
            self.consumer.commit()


if __name__ == __main__:

    if len(sys.argv) == 3:
        obj = UnzipConsumer(sys.argv[1], sys.argv[2])
        obj.run()
    else:
        print Parameters: [topic] [id]


# python unzipConsumer.py  huashi #0

 

 

监控工具

https://github.com/quantifind/KafkaOffsetMonitor

实时监控kafka的consumer以及他们在partition中的offset

因为KafkaOffsetMonitor中有些资源文件(css,js)是访问google资源,所以有人做了修改版 http://pan.baidu.com/s/1qWH05q8 

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost --refresh 15.minutes --retain 5.day --port 5354

  

 

 

Ref:

http://blog.csdn.net/lizhitao/article/details/27199863

Kafka备忘

标签:

原文地址:http://www.cnblogs.com/manhua/p/4956071.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!