标签:upd family nstat network sof 地理 并且 let image
本文主要参考社区0.11版本Controller的重设计方案,试图给大家梳理一下Kafka controller这个组件在设计上的一些重要思考。众所周知,Kafka中有个关键组件叫controller,负责管理和协调Kafka集群。网上关于controller的源码分析也有很多,本文就不再大段地列出代码重复做这件事情了。实际上,对于controller的代码我一直觉得写的非常混乱,各种调用关系十分复杂,想要完整地理解它的工作原理确实不易。好在我们就是普通的使用者,大致了解controller的工作原理即可。下面我就带各位简要了解一下当前Kafka controller的原理架构以及社区为什么要在大改controller的设计。
“负责管理和协调Kafka集群”的说法实在没有什么营养,上点干货吧——具体来说Controller目前主要提供多达10种的Kafka服务功能的实现,它们分别是:
当前controller启动时会为集群中所有broker创建一个各自的连接。这么说吧,假设你的集群中有100台broker,那么controller启动时会创建100个Socket连接(也包括与它自己的连接!)。当前新版本的Kafka统一使用了NetworkClient类来建模底层的网络连接(有兴趣研究源码的可以去看下这个类,它主要依赖于Java NIO的Selector)。Controller会为每个连接都创建一个对应的请求发送线程,专门负责给对应的broker发送请求。也就是说,如果还是那100台broker,那么controller启动时还会创建100个RequestSendThread线程。当前的设计中Controller只能给broker发送三类请求,它们是:
Controller通常都是发送请求给broker的,只有上面谈到的controller 10大功能中的ControlledShutdownRequest请求是例外:这个请求是待关闭的broker通过RPC发送给controller的,即它的方向是反的。另外这个请求还有一个特别之处就是其他所有功能或是请求都是通过Zookeeper间接与controller交互的,只有它是直接与controller进行交互的。
构成controller的组件太多了,多到我已经不想用文字表达了,直接上图吧:
其中比较重要的组件包括:
缓存内容十分丰富,这也是controller可以协调管理整个cluster的基础。
不谦虚地说,我混迹社区也有些日子了。在里面碰到过很多关于controller的bug。社区对于这些bug有个很共性的特点,那就是没有什么人愿意(敢去)改这部分代码,因为它实在是太复杂了。具体的问题包括:
编写正确的多线程程序一直是Java开发者的痛点。在Controller的实现类KafkaController中创建了很多线程,比如之前提到的RequestSendThread线程,另外ZkClient也会创建单独的线程来处理zookeeper回调,这还不算TopicDeletionManager创建的线程和其他IO线程等。几乎所有这些线程都需要访问ControllerContext(RequestSendThread只操作它们专属的请求队列,不会访问ControllerContext),因此必要的多线程同步机制是一定需要的。当前是使用controllerLock锁来实现的,因此可以说没有并行度可言。
看过源代码的人相信对这一点深有体会。KafkaController、PartitionStateMachine和ReplicaStateMachine每个都是500+行的大类且彼此混调的现象明显,比如KafkaController的stopOldReplicasOfReassignedPartition方法调用ReplicaStateMachine的handleStateChanges方法,而后者又会调用KafkaController的remoteReplicaFromIsr方法。类似的情况还发生在KafkaController和ControllerChannelManager之间。
当前broker对入站请求类型不做任何优先级处理,不论是PRODUCE请求、FETCH请求还是Controller类的请求。这就可能造成一个问题:即clients发送的数据类请求积压导致controller推迟了管理类请求的处理。设想这样的场景,假设controller向broker广播了leader发生变更。于是新leader开始接收clients端请求,而同时老leader所在的broker由于出现了数据类请求的积压使得它一直忙于处理这些请求而无法处理controller发来的LeaderAndIsrRequest请求,因此这是就会出现“双主”的情况——也就是所谓的脑裂。此时倘若client发送的一个PRODUCE请求未指定acks=-1,那么因为日志水位截断的缘故这个请求包含的消息就可能“丢失”了。现在社区中关于controller丢失数据的bug大多是因为这个原因造成的。
当前controller操作Zookeeper是通过ZkClient来完成的。ZkClient目前是同步写入Zookeeper,而同步通常意味着性能不高。更为严重的是,controller是一个分区一个分区进行写入的,对于分区数很多的集群来说,这无疑是个巨大的性能瓶颈。如果用户仔细查看源代码,可以发现PartitionStateMachine的electLeaderForPartition就是一个分区一个分区地选举的。
Controller当前发送请求都是按照分区级别发送的,即一个分区一个分区地发送。没有任何batch或并行可言,效率很低。
这里的版本号类似于new consumer的generation,总之是要有一种机制告诉controller broker的版本信息。因为有些情况下broker会处理本已过期或失效的请求导致broker状态不一致。举个例子,如果一个broker正常关闭过程中“宕机”了,那么重启之后这个broker就有可能处理之前controller发送过来的StopReplicaRequest,导致某些副本被置成offline从而无法使用。而这肯定不是我们希望看到的结果,对吧?
Contoller目前是使用了ZkClient这个开源工具,它可以自动重建会话并使用特有的线程顺序处理所有的Zookeeper监听消息。因为是顺序处理,它就有可能无法及时响应最新的状态变更导致Kafka集群状态的不一致。
和new consumer类似,controller摒弃多线程的模型,采用单线程的事件队列模型。这样简化了设计同时也避免了复杂的同步机制。各位在最新的trunk分支上已然可以看到这种变化:增加了ControllerEventManager类以及对应的ControllerEventThread线程类专门负责处理ControllerEvent。目前总共有9种controller event,它们分别是:
我们基本上可以从名字就能判断出它们分别代表了什么事件。
将所有同步操作Zookeeper的地方都改成异步调用+回调的方式。实际上Apache Zookeeper客户端执行请求的方式有三种:同步、异步和batch。通常以batch性能最好,但Kafka社区目前还是倾向于用async替换sync。毕竟实现起来相对简单同时性能上也能得到不少提升。
可能摒弃之前状态机的方式,采用和GroupCoordinator类似的方式,让controller保存所有的状态并且负责状态的流转以及状态流转过程中的逻辑。当然,具体的实现还要再结合0.11最终代码才能确定。
对管理类请求和数据类请求区分优先级。比如使用优先级队列替换现有的BlockingQueue——社区应该已经实现了这个功能,开发了一个叫PrioritizationAwareBlockingQueue的类来做这件事情,后续大家可以看下这个类的源代码
为broker设定版本号(generation id)。如果controller发送过来的请求中包含的generation与broker自己的generation不匹配, 那么broker会拒绝该请求。
ZkClient是同步顺序处理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API还能够在接收到状态变更通知时便马上开始处理,而ZkClient的特定线程则必须要在队列中顺序处理到这条变更消息时才能处理。
以上就是关于Kafka controller的一些讨论,包括了它当前的组件构成、设计问题以及对应的改进方案。有很多地方可能理解的还不是透彻,期待着在Kafka 0.11正式版本中可以看到全新的controller组件。
标签:upd family nstat network sof 地理 并且 let image
原文地址:http://www.cnblogs.com/huxi2b/p/6980045.html