标签:文件 消费者 cti stop sage mes byte 日志 replicat
# 切换到目录
cd /app
# 获取kafka最新安装包,这边使用的是镜像地址,可以去官方网站获得最新地址版本号使用kafka_2.11-1.1.0.tgz,可以自己下载上传上去
wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
# 解压软件包
tar -zxvf kafka_2.11-1.1.0.tgz
# 创建软连接
ln -s kafka_2.11-1.1.0 kafka
# 进入配置文件
cd /app/kafka/config
mkdir -p /app/kafka/logdirs
# 启动kafka之前先保证zookeeper是启动的,保证java环境是安装的,修改丢应的配置
vim server.properties
# 启动kafka服务
/app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server.properties
# 添加主题
bin/kafka-topics.sh --create --zookeeper 172.16.48.129:2181 --replication-factor 1 --partitions 1 --topic test
# 得到Created topic "test"成功的提示
# 添加生产者
bin/kafka-console-producer.sh --broker-list 172.16.48.129:9092 --topic test
# 输入文字:This is a message
# 添加生产者
bin/kafka-console-consumer.sh --bootstrap-server 172.16.48.129:9092 --topic test --from-beginning
# 收到文字:This is another message
# 生成者发送消息,消费者收到消息即可
配置kafka的server.properties的配置文件
# 修改zookeeper连接地址
zookeeper.connect=172.16.48.129:2181
# 修改kafka日志地址
log.dirs=/app/kafka/logdirs
# 在broker.id=0下面添加端口以及ip绑定的功能,内网ip地址
host.name=172.16.48.129
# 设定服务标记id,每一台的id需要不相同当前分别对应1、2、3
broker.id=1
# 设定绑定的host名称,绑定对应自己的内网ip地址
host.name=172.19.131.247
# 设定主体同步数量和集群数量相同
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# 配置参数设定在log.retention.hours下面
message.max.byte=5242880
default.replication.factor=3
replica.fetch.max.bytes=5242880
# zookeeper配置地址
zookeeper.connect=172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181
# 允许删除主题
delete.topic.enable=true
虚拟机1 | 虚拟机2 | 虚拟机3 |
---|---|---|
172.16.48.129 | 172.16.48.130 | 172.16.48.131 |
kafka1 | kafka2 | kafka3 |
# 创建测试主题
/app/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# 结果:Created topic "my-replicated-topic".
# 检查主题状态
/app/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 localhost:2181 --topic my-replicated-topic
# 结果:Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
# Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
# 一个生产者
/app/kafka/bin/kafka-console-producer.sh --broker-list 172.16.48.129:9092,172.16.48.130:9092,172.16.48.131:9092 --topic my-replicated-topic
# 三个消费者:
/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.48.129:9092,172.16.48.130:9092,172.16.48.131:9092 --from-beginning --topic my-replicated-topic
通过监控集群以及生产和消费,来判断高可用情况的测试
# 通过主题状态监控集群的leader和Replicas和Isr
/app/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 localhost:2181 --topic my-replicated-topic
# 分别停止leader的服务的kafka服务,来检查是否可以生产和消费的情况
/app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server.properties
/app/kafka/bin/kafka-server-stop.sh
结论:kafka环境在zookeeper没有挂的情况下,允许只剩下一台还能够工作。
常用配置:/app/kafka/config
日志路径:/app/kafka/logs
标签:文件 消费者 cti stop sage mes byte 日志 replicat
原文地址:https://www.cnblogs.com/fly-piglet/p/9837186.html