标签:http tor sign https group info version date incr
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
http://kafka.apache.org/documentation.html#quickstart
http://kafka.apache.org/documentation.html#operations
为了便于使用。kafka提供了比較强大的Tools。把常常须要使用的整理一下
开关kafka Server
bin/kafka-server-start.sh config/server.properties bin/kafka-server-stop.sh JMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties &
topic相关
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test bin/kafka-topics.sh --list --zookeeper localhost:2181
describe topic的具体情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
改动topic的partition,仅仅能添加
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test
到0.8.2才正式支持删除topic,当前是beta版
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
查看有问题的partition
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions --topic test
per-topic 改动參数
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --deleteConfig max.message.bytes
集群扩展
集群扩展。对于broker还是比較简单的,可是现有的topic上的partition是不会做自己主动迁移的
须要手工做迁移,但kafka提供了比較方便的工具。
--generate,生成參考的迁移计划
given a list of topics and a list of brokers,工具会给出迁徙方案
把topic全然迁移到新的brokers
> cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
给出当前的assignment情况和,迁移方案
我们能够同一时候保存当前的assignment情况和迁移方案。当前的assignment情况能够用于rollback
--execute,開始运行迁移
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
--verify,check当前的迁移状态
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
选择topic的某个partition的某些replica进行迁徙
moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3
> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
brokers下线
当前版本号不支持下线的规划,须要到0.8.2才支持,这须要把一个broker上的replica清空
添加replication factor
partition 0的replica数从1增长到3,当前replica存在broker5,在broker6,7上添加replica
> cat increase-replication-factor.json {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
Producer console
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
后面能够随意的输入message。都会发到broker的topic中
Comsumer console
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
从头读这个topic,能够反复读到全部数据
我在想为啥,每次都能replay。原来每次都是随机产生一个groupid
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
Consumer Offset Checker
这个会显示出consumer group的offset情况, 必须參数为--group。 不指定--topic,默觉得全部topic
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group
bin/kafka-run-
class
.sh kafka.tools.ConsumerOffsetChecker
required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)
Example,
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
Export Zookeeper Offsets
将Zk中的offset信息以以下的形式打到file里面去
A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:
/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985
bin/kafka-run-
class
.sh kafka.tools.ExportZkOffsets
required argument: [zkconnect]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--output-file Output file
--zkconnect ZooKeeper connect string. (default: localhost:2181)
Update Offsets In Zookeeper
这个挺实用,用于replay, kafka的文档有点坑爹,看了不知道咋用。还是看源代码才看明确
A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
bin/kafka-run-
class
.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
Example,
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
能够看到offset已经被清0。Lag=logSize
更加直接的方式是。直接去Zookeeper里面看
通过zkCli.sh连上后,通过ls查看
Broker Node Registry
/brokers/ids/[0...N] --> host:port (ephemeral node)
Broker Topic Registry
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
Consumer Id Registry
/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
Consumer Offset Tracking
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
Partition Owner registry
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
标签:http tor sign https group info version date incr
原文地址:http://www.cnblogs.com/gccbuaa/p/6917899.html