标签:领域 hang 序列号 文件 ges 12px 它的 spl 流量
Apache Kafka在全球各个领域各大公司获得广泛使用,得益于它强大的功能和不断完善的生态。其中Kafka动态配置是一个比较高频好用的功能,下面我们就来一探究竟。
Kafka初始开源的几个版本,当broker初始化启动时,所有配置信息只能从server.properties静态文件读取,以后不再发生任何更改,随着Kafka逐步迭代,在线业务对稳定性和个性化要求越来越突出,需要能支持在线修改功能动态生效的需求应运而生。例如:按照topic维度清理数据,依据clientid限流,根据用户名称进行访问权限控制等等。目前Kafka最新版本支持以下几类动态配置。
动态配置文件发展历程:
kafka在0.8.0对topic的管理功能分布在三个shell中,它们分别是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,后来社区考虑到topic管理功能过于分散,到了0.8.1版本有关topic所有功能收敛到kafka-topics.sh中。0.8.0中只有topic的创建、删除和列表及添加分区功能,到了0.8.1开始支持topics动态配置了。
0.9.0.0开始支持client(producer和consumer)客户端配额限流支持,确保不因为某个或少数几个topic的客户端占满了broker带宽资源和磁盘IO资源,影响其他客户端的正常读写,导致集群内主从同步也受到影响。这个功能对确保系统SLA大有好处,通过服务降级,保证写/生产不受影响,降低或暂停读/消费流量更容易解决系统资源瓶颈。
0.9.0版本动态配置与topic管理分离,为了保持向下兼容kafka-topics.sh依然包含操作topic动态配置功能,新增kafka-configs.sh支持clients和topics动态配置功能,所以kafka-topics.sh和kafka-configs.sh任意一个都可以修改topic动态配置
0.10.1.0版本新增支持users和brokers动态配置功能,user动态配置用于访问资源的权限控制,提升集群的访问和数据安全性,例如:用户对读/写/创建/删除等操作和API、topic、group资源访问控制。broker动态配置,在不用重启及影响服务运行情况下,broker级别功能实现动态生效,例如:副本注册复制速率、磁盘内挂载点间数据迁移速率、网络请求的线程数、处理请求的I/O线程数等等全局参数等等。
0.10.1.0~2.3.1版本都支持topics、clients、users、brokers四类型动态配置的11种粒度配置对象,只是配置模块和属性字段有增减与调整。
用户使用kafka-configs.sh脚本,根据格式和参数规范要求,ConfigCommand类进行相关逻辑处理、json格式和内容校验,生成notification json,写入到序列化持久节点上,zk路径为xxx/config/changes/config_change_seqNo,节点名称为config_change_seqNo,其中seqNo从1开始的自增序号。kafka集群中所有broker通过监听zk上xxx/config/changes的children变化,每次获得比当前内存中last_seqNo大的seqNo的json内容,从中读取entity_type/entity_name相对路径,由此判断如何从xxx/config/topics|clients|users|brokers四种类型中读取哪个配置路径。同一个Broker在操作过程中任何时刻只能串行读写一种类型的配置,多种配置需要串行操作。
各个角色的作用:
kafka-config.sh: 负责写dynamic config和notification,写顺序上图有先后标识。
broker:负责监听xxx/config/changes子节点变化和读取entity_type/entity_name路径节点上内容
zk:负责存储notification和dynamic config及下发配置给相应的broker
notification json内容:
V1 0.10.0.1及以前版本有效
{ "version": 1, "entity_type": "topics", "entity_name": "finalTest" }
V2 0.10.1.0~2.3.1 当前最新版本都有效
以上不管是version 1还是version2,本质上没有变化。都是通过entity_type/entity_name获得entity_path的zk相对路径,全路径为xxx/config/entityType/entityName,具体请看如下详图
entity_type=topics | clients | users | brokers
entity_name=topicName | clientId | userId | (brokerId | <default>)
当entity_type为brokers时,brokerId为broker编号与自己的server.properties对应,只对某个broker生效。“<default>”指对所有broker生效。而entity_type为topics | clients | users对所有broker都生效。通过以上entity_type/entity_name六种组合成六个zk相对路径。
topics和clients组合原理一样,但users和brokers却略有不同,他们各自有2个组合,除了普通组合还有复合组合,两种类型组合在一起,例如users有users与clients组合,zk路径为users/<user>/clients/<clientId>;brokers动态配置非常实用,不需要重启就能动态更改任意数量brokers配置,更改所有brokers为xxx/brokers/<default>
四类动态配置11种zk相对路径,根据11种zk相对路径可以读取11种粒度配置对象dynamic config。
<default>说明:某种类型下所有作用域生效,例如xxx/clients/<default>和xxx/brokers/<default>就是集群内所有All clients和集群内所有All brokers配置都会生效,其他同理。
dynamic config内容示例:
entity_type/entity_name=topics/<topic_name>=topics/finalTest
{ "version": 1, "config": { "retention.bytes": "102400000", "flush.ms": "5000", "cleanup.policy": "compact", "flush.messages", ... } }
entity_type/entity_name=clients/<clientId>=clients/camusall
{ "version": 1, "config": { "producer_byte_rate": "20971520", "consumer_byte_rate": "20971520" } }
entity_type/entity_name=brokers/all brokers=brokers/<default>
{ "version": 1, "config": { "leader.replication.throttled.rate": "5000", "follower.replication.throttled.rate": "60000", "replica.alter.log.dirs.io.max.bytes.per.second": "5000", "log.retention.hours": "24", "log.flush.interval.messages": "5000", "min.insync.replicas": "2", ... } }
entity_type/entity_name=brokers/brokerId 与all brokers的配置形成完全一样,只是作用域范围不同而已,此处省略。
写配置格式校验
如果写入配置不进行规范校验,broker就会读取处理过程中,就会卡住或阻塞,影响服务运行稳定性。所以配置校验至关重要,校验规则如下:
config_change_seqNo生成规则
kafka-configs.sh脚本每成功执行一次,在zk上就创建一个新的seqNode节点(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久顺序节点(PERSISTENT_SEQUENTIAL),它的组成是seqNode = seqNodePrefix + seqNodeSuffix,config_change_固定为seqNode的前缀,seqNodeSuffix = seqNo为seqNode的后缀,seqNo是10位数字的序列号,这个序列号后缀是自增的,由zk服务端自动生成和维护,每次事务请求成功就加1,它与MySQL的自增id原理一样。
config_change_seqno清除规则
集群经过长期运行积累,xxx/config/changes下会留存大量历史节点,如果不及时清理,会有以下影响:
综上所述,因此必须要及时清除无用的seqNode集合,清除公式步骤如下:
config_change_seqno判断处理
前面提到每当触发回调处理,seqNode节点创建时间过期15分钟会被删除,删除条件是触发才会被执行,如果长时间不创建就可能有少数几个seqNode一直保留。如果短时间内(15分钟内)创建大量seqNode,又不会立即被删除,只有等到下次触发达到条件才行,那怎么判断哪些会被处理呢?broker缓存中维护一个变量lastExecutedSeqNo,它负责保存执行历史中seqNode最大顺序号,所以每当触发回调获取seqNodeSet列表时,都能轻易判断出哪些需要处理计算,也会同步更新lastExecutedSeqNo。
notification作用
静态与动态区别。静态配置是Broker内置默认配置和静态配置文件server.properties,broker启动前可以任意修改,启动后不可修改。动态配置是broker启动运行后,可以在线更新生效,偷偷说一句离线也可以改,就是不生效而已。
配置优先级。以上4个图包含4类型配置既有动态也有静态,那优先级如何呢?动态配置优先级高于静态配置。如上图1、2、4,环越小优先级越高,对于动态配置来说,修改配置作用域范围越小优先级越高,反之亦然。优先级最高的,会逐级覆盖相同配置项。
当broker启动时。读取顺序依次为broker内置默认配置,broker静态配置文件,动态配置。当配置项相同时,高优先级覆盖次优先的,其他依次类推
图示说明:上图1、2、3、4中,其中图1、2、4中环数字表示配置优先级关系,数字从1~5表示优先级从高到低。图3为两级关联。Users和Clients组合实现配置管理,这两者组合用于客户端配额限流,Users与Clients就像两级目录一样,一个User可以包含一个、多个clientId或所有clientId。图4中既有优先级关系也有配置参数包含关系,topics类型配置是brokers类型配置的子集,brokers除了包含topic配置外还有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。
如想更深入了解kafka-configs.sh用法,请查看
从设计原理中了解config_change_seqNo生成规则
写上文中理解了写Notification的作用,从而理解什么场景会适合使用zk中持久顺序节点(PERSISTENT_SEQUENTIAL)
Release Notes - Kafka - Version 0.8.1:https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.1.0:https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.0:https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.1:https://archive.apache.org/dist/kafka/0.10.2.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.0:https://archive.apache.org/dist/kafka/1.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.1:https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 2.0.0:https://archive.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
KIP-21 - Dynamic Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
KIP-226 - Dynamic Broker Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211:https://issues.apache.org/jira/browse/KAFKA-2547
KIP-257 - Configurable Quota Management:https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management
KIP-73 Replication Quotas:https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas
标签:领域 hang 序列号 文件 ges 12px 它的 spl 流量
原文地址:https://www.cnblogs.com/lizherui/p/12271285.html