标签:kafka
场景:老集群将不再使用,上边的kafka集群中的数据要导入到新的集群的kafka中
倒入步骤(例如按照天的topic迁移):
因为kafka默认只保留7天的数据,所以只迁移近7天的数据
1. 首先使用kafka-topic.sh客户端列出kafka集群上所有topic
2. 取出近7天的topic名称,写入到一个txt文本中,每个topic名称一行
3. 导出:for i in $(cat txt);do kafka-console-consumer.sh --zookeeper=... --topic=$i --from-beginning >>$i.txt;done
4. 导入: 先创建topic:kafka-topic.sh create topic
cat $i.txt | kafka-conso-producer.sh --broker-list=... --topic=$i
第二种方案不常用:
借鉴:http://www.cnblogs.com/dycg/p/3922352.html
具体步骤如下:
1.在新节点上搭建kafka服务
原先我有2台机器, broker.id分别为1和2
现在我新机器上broker.id分别设置为3和4
2.启动所有kafka 服务
3.确认要移动的topics
kafka-topics.sh --list --zookeeper 192.168.103.47:2181 查看所有主题
复制这些topic,并写成如下格式的文件, 命名为 topics-to-move.json
{"topics": [
{"topic": "fortest1"},
{"topic": "fortest2"},
{"topic": "fortest3"}
],
"version":1
}
4.生成移动脚本
运行bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4" --generate
其中3,4是你的新节点的broker.id
这样就会生成一串新的json数据
{"version":1,"partitions":[{"topic":"fortest1","partition":0,"replicas":[3,4]},其他部分省略}
将这一串json写入新文件reassignment-node.json中
5.这时候,万事俱备, 开始迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --reassignment-json-file reassignment-node.json --execute
6.适当时候, 运行如下命令,查看运行结果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --reassignment-json-file reassignment-node.json --verify
假设出现
ERROR: Assigned replicas (3,4,0,1) don‘t match the list of replicas for reassignment (3,4) for partition [mpt-session,1]
这样的错误, 他并不是真的出错,而是指目前仍在复制数据中.
再过一段时间再运行verify命令,他就会消失(加入完成拷贝)
7.数据完成迁移后, 老的服务先别停.
8.修改所有客户端producer或者consumer连接指向新节点.
9.测试正常后,
本文出自 “明天过后” 博客,谢绝转载!
标签:kafka
原文地址:http://leeyan.blog.51cto.com/8379003/1829062