标签:read commit 保存 时间 sla des 守护 方案 进程
Kafka 是一个快速、可扩展和高可用的基于发布-订阅模式(pub-sub model)的消息系统,用作消息中间件,在系统之间传递消息。其核心概念有:
在 Kafka 中,所有的消息都由 Topic 来组织,Producer 把消息发送到特定的 Topic,Consumer 从特定的 Topic 中读取消息。作为一个分布式系统,Kafka 运行在集群中,集群中的每个节点称之为 Broker。
下载 kafka_2.11-2.1.0.tgz
,也可以使用命令 wget "http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz"
下载,注意 kafka 中已经包含了 zookeeper。执行以下命令解压并切换工作目录:
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 启动 zookeeper,默认监听端口 2181
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 kafka,默认监听端口 9092
bin/kafka-server-start.sh config/server.properties
# 启动 zookeeper & kafka,以守护进程方式
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
# 停止 zookeeper
bin/zookeeper-server-stop.sh config/zookeeper.properties
# 停止 kafka
ps aux | grep kafka | grep -v grep | awk '{print $2}'
kill [pid]
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic a_topic
bin/kafka-topics.sh --alter --topic STOP_JOB --zookeeper localhost:2181 --partitions 3
# 新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 172.16.0.4:9092 --list
# 旧版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
# 新版
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --describe --group console-consumer-85731
# 旧版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-11967 --describe
# 指定偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --group ABOT_MASTER --topic JOB_LEGACY --reset-offsets --to-offset 29261788 –execute
./kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_SLAVE --all-topics --by-duration PT06H0M0S --execute
./kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_MASTER --all-topics --by-duration PT06H0M0S --execute
#最新偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --group ABOT_MASTER --topic JOB_LEGACY --reset-offsets --to-latest –execute
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_MASTER --all-topics --to-latest --execute
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_SLAVE --all-topics --to-latest --execute
模式 | 特点 | 设置 |
---|---|---|
At-most-once(最多一次) | 消息最多被消费一次 | 让 Kafka 在特定时间间隔内自动提交,如网络中断恢复或程序重启可能会使消息未被处理完就被提交了。设置 enable.auto.commit 为 true,设置 auto.commit.interval.ms 为一个较小的时间间隔,不用手动调用 commitSync() 。 |
At-least-once(最少一次) | 消息至少被消费一次 | 手动提交,如提交失败,则下次重复推送。设置 enable.auto.commit 为 false,然后调用 commitSync() 。 |
Exactly-once(正好一次) | 消息有且只有被消费一次 | 保证消息处理和提交反馈在同一个事务中(ACID,原子性、一致性、隔离性和持久性)。设置 enable.auto.commit 为 false,保存 ConsumeRecord 中的 offset 到数据库,实现 ConsumerRebalanceListener ,监听 Consumer Rebalance 事件,然后使用seek 方法将数据库的 offset 更新到 Kafka。 |
作者:ens
链接:https://juejin.im/post/5baca032e51d450e735e74af
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
$ /bin/kafka-topics.sh --alter --topic A_TASK --zookeeper localhost:2181 --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka 版本要求 <=1.1.0 ( wget "https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz" )
可以通过 Morningstar/kafka-offset-monitor 监控 Kafka 偏移量,具体是
下载 Jar
运行
java -Djava.security.auth.login.config=conf/server-client-jaas.conf -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 --kafkaSecurityProtocol SASL_PLAINTEXT --zk zkserver01,zkserver02 --port 8081 --refresh 10.seconds --retain 2.days --dbName offsetapp_kafka
通过 http://{{HOST}}:9090 打开 web 页面监控。
设置 log.retention.hours=6
,只保存日志 6 小时,然后重启 kafka,这样之前保留的日志会被自动清除。
默认配置如下:
listeners=PLAINTEXT://:9092
如果被修改为错误地址那么将无法连接。
一个线程只能有一个 Consumer,如果多个线程共用一个 Consumer,那么就会出现这个错误。
解决方案就是去解决线程问题,确保只有一个线程调用一个 Consumer。
背景:查看消息日志发现原本按顺序发送的消息 1,2,3
被消费的时候变成 1,3,2
了。
原因:Kafka 不保证全局的消息有序性,但保证同一个 Partition 消息的有序性。
解决方案:发消息时设置同一个 Key 或者直接指定同一个 Partition 即可。
调整日志级别为 info,在 logback.xml 文件中写入
<logger name="org.apache.kafka" level="INFO" />
标签:read commit 保存 时间 sla des 守护 方案 进程
原文地址:https://www.cnblogs.com/lshare/p/11334361.html