标签:
参考 http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/
http://zqhxuyuan.github.io/2016/01/13/2016-01-13-Kafka-Picture/
http://jianbeike.blogspot.com.au/2016/04/kafka.html
http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/
http://www.oschina.net/translate/kafka-replication?print
http://www.jianshu.com/p/3f24d4b53f7f
https://www.iteblog.com/archives/1805
http://www.cnblogs.com/fxjwind/p/4972244.html
http://www.jasongj.com/2015/04/24/KafkaColumn2/
//kafka安装
http://yanliu.org/2015/08/31/kafka%E9%9B%86%E7%BE%A4%E9%85%8D%E7%BD%AE/
http://bbs.kekeyun.com/thread-101-1-1.html
http://blog.csdn.net/lizhitao/article/details/45066437
分布式系统 的优点 就是 将原本一台服务器受到的压力,分散到不同服务器上去
HW表示的是所有ISR中的节点都已经复制完的消息的offset.也是消费者所能获取到的消息的最大offset,所以叫做high watermark.
注意Leader Partition保存了ISR信息.所以可以看到maybeIncrementLeaderHW()是在appendToLocalLog()内一起执行的
任何Replication的LEO
发生变化 (ISR中的followers有任何一个节点LEO改变,看看所有ISR是否都复制了,然后更新HW)
private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
// 所有inSync副本中最小的LEO(因为每个follower的LEO都可能不一样), 表示的是最新的hw
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
// Leader本身的hw, 是旧的
val oldHighWatermark = leaderReplica.highWatermark // 是一个LogOffsetMetadata
if(oldHighWatermark.precedes(newHighWatermark)) { // 比较Leader的messageOffset是否小于ISR的
leaderReplica.highWatermark = newHighWatermark // Leader小于ISR, 更新Leader为ISR中最小的
true
}else false // Returns true if the HW was incremented, and false otherwise.
}
delay operation complete http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/
触发条件(延迟请求以及增加HW)中关于ISR的部分都是环环相扣的:
生产者生产数据后,相应broker会感知到offset的变化,然后通知它的follower,同时返回leader的HW, follower会主动向leader拉取数据,但每个follower所在的机器性能不同,可能拉取数据的个数也不一样,导致各个的LEO也不一样,为了分区对应的broker中的
数据一致,leader挑选follower返回的各自LEO中,选择最小的offset,做为HW,并更新,并通知生产者。
消费者只能消费处于HW以下的数据,因为以下的数据,在follower各个机器中都存在,可理解为数据是一致的
关于leader的选举原理
每一个分区对应的broker中,都只能有一个leader和若干个follower,
kafka没有采用 少数服从多数 分布式的选举方法,而是自己实现了一个 ISR (in sync replication) ISR中的follower都是在速度上能跟得上leader的broker
kafka集群第一次使用的时候,里面是没有数据的,
随机选一个broker作为leader,余下的broker放到ISR中, 同时启动一个线程,专门检查对应的follower,看他们在规定时间内是否fetch数据(默认1s),如果不符合这个条件,就将此follower踢出ISR
当leader宕机后,从ISR中挑选一个做为新leader,但如何挑选新的leader?
关于leader的选举方法
在kafka 0.8之前,在创建一个topic时,相应broker里面是没有数据的,那么随机找一个broker做为leader,余下的放到ISR中
每个分区中的follower所在的机器 ,都要/broker/ids/[0,1,2]做一个watch ,这样当/broker/ids/中的某个leader宕机后,zookeeper能通知相应follower,但这样zookeeper的负载很重
比如说100台broker,有2000个分区,每个分区有3个备份,那么在zookeeper中要安放2000*3=6000个watch,zookeeper本身也是集群,负载过重
kafka从0.8开始,不再采用上面的方法,在所有broker中选举出一个controller, 这个controler将决定各个分区中leader的选举
同时 在/broker/ids 做watch,/broker/topics 也做watch
在创建topic时,controler 在 /broker/ids 中读取broker id 列表,针对每一个分区,在所有的broker 中选取一个做为leader,余下的作为ISR,因为此时刚创建完topic,也没有数据
同时将leader 以及 ISR写到 /broker/topic/[topic_name]/partions/0/state 中去, 其内容大概为 leader 为 brokerA , ISR为[brokerB, brokerC, brokerD],
同时告诉相应的broker,因为有些broker是leader,有些broker是follower,要做一些初始化的工作
详见 http://jianbeike.blogspot.com.au/2016/04/kafka.html
makeLeader过程点评
leader的作用除了接收produce和consume请求,还有一点就是管理ISR以及highwatermark。而makeLeader过程就是为了开启leader的这些功能准备的,首先它要根据topic-partition创建(如果没有)message log目录,然后将自己的endlogoffset作为highwatermark,开启定期检测isr follower是否脱离isr(长时间未发fetch或者落后leaderlogendoffset太多)。
makeFollower过程点评
makeFollower的过程比makeLeader的过程要复杂,刚才说了,leader管理ISR和highwatermark(可以看概念说明那节),那么highwatermark对于Follower可见吗?当然Follower发送fetch请求时会将自身endlogoffset带过去,而返回结果中会有leader返回的
highwatermark。
为什么要有highwatermark?
答:看上图,假设某个topic-partition(比如topic1的partition0)的replicalist分配在4台机器上,A,B,C,D,produce端设置的ack为1,也就是只要leader 接收处理message成功就返回成功,那么这时replica list的endlogoffset会出现分化。
A作为leader肯定是endlogoffset最高,B紧随其后,C机器由于配置比较低,同步较慢,D机器配置最低,已经被A移除了ISR。
假设这个时候某几个机器出现故障,比如A,C宕机,这时B会成为leader,假如没有highwatermark,在A重启时的时候会做makeFollower操作,在宕机时log文件之后直接追加message,而假如B机器的endlogoffset已经达到A的endlogoffset,会产生数据不一致的情况,所以使用highwatermark来避免这种情况。
在A 做makeFollower操作时,将log文件truncate到highwatermark位置,以防止发生数据不一致情况发生。
还有一种情形会导致数据不一致,那就是uncleanleader election,ABC机器都宕机的情况,D机器已经启动,controller会将D作为leader,很明显即便有了highwatermark,也会发生数据不一致,同样消息数据也会丢失。目前kafka 0.8.1.1的版本,没有将unclean election 开关开放给用户,所以这块要做好监控
停止对这些成为follower的partition的拉取线程,把这些partition的Log截断到highWaterMark的位置,并启动对那些成为leader的partition的拉取线程
当 broker 宕机时,controller在/broker/ids做的watch也会触发 详见
跟创建 topic一样,遍历/broker/topic/[topic_name]/partition/0/state ,在ISR中选择一个活着的broker,如果没有,就在所有的备份broker中找一个,
然后再将新的 leadre, ISP 写到上面路径去,同时通知相关broker,做好makeLeader,makeFollower工作
producer发送消息。
producer 可以直接发送到broker对应的leader partition中,不需要经历任何一个中介的转发。为实现这个特性,每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的你leader partition都在哪。现阶段哪些leader partition 是可以直接访问的?
如果访问的不是leader partition 怎么搞? 而且我看是可以指定多个进行访问的。
producer 和 partition 。
producer 可以控制以什么样的将消息推送到客户端。实现方法包括随机,实现一类随机负载均衡的算法,或者指定一些分区算法。kafka 提供了用户自定义分区的方法,用户可以为每一个消息指定一个partitionkey,通过这个key来实现一些hash 分区算法。
http://www.aichengxu.com/view/4683767
副本获取器线程,主要定义了以下几个方法:
1. handlePartitionsWithErrors:处理有错误(leader已经发生编程)的分区,当前什么都不做因为controller会应对这些变更
2. handleOffsetOutOfRange:处理一个位移越界的分区返回新的获取位移值(fetch offset),具体逻辑如下:
获取给定topic分区在该broker上的副本
获取该分区leader的结束位移值,如果leader的结束位移值比该副本的结束位移还小的话,先判断一下是否启用了unclean leader选举。若没有启用,直接报错;否则就将follower副本的位移直接截取成leader的结束位移
若follower位移比leader的还小,直接截取所有位移并设置leader的初始位移处开始读取leader
如果启用了unclean leader选举,那么就有可能出现这样的情景:一个follower宕机了,而同时leader还在不停地写入消息。当这个follower重启回来的时候它需要完整地追上leader的进度。就在这个过程中,ISR中所有的副本都宕掉了。那么此时这个follower就会被unclean leader选举为新的leader,然后它开始写入从客户端发来的消息。之后旧的leader恢复,成为了一个follower,它会发现当前leader的最新位移居然比自己的还要小。这种情况下,只能截断自己的位移使之与当前leader的最新位移保持一致然后继续处理。
3. processPartitionData:处理获取到的数据。主要逻辑就是将给定的response数据解析出来并更新到该broker上的副本对象中,比如获取到的消息集合以及更新高水位值
Replicated Logs
Kafka的partition可以看成是一个replicated log, 每个replica就是这个replicated log其中的一个log。多个replica是为了容忍机器故障,因此同一个partition的不同replica需要被分配到不同的broker上。所以,对于一个partition,broker id即可唯一代表一个replica,也被当作replica id。
为了一致性,Kafka在同一个partition的replicas中选出一个作为leader,由它接受client的所有读写请求,而其它的replica作为follower,从leader处拉取数据,leader作为唯一的"source of truth"。在有些情况下,follower会truncate自己的log(这个log和以下的log都是指"replicated log"这个概念里的log),然后重新从leader处抓取数据,以求与leader一致(下面会讲到)。
leader和follower的角色区分,也主要是ReplicaManager来实现。具体地讲
HW、ISR以及leader对于partition这个多副本系统算是一种元数据。ISR和leader确要在controller和所有replica之间保持一致,HW需要在leader和follower之间保持一致,因为在leader转换的时候,HW是安全线。
下面明确一下high watermark和log end offset在源码里的意义
HW high watermark offset的数据小于被认为是commit的,注意,offset为high watermark的message并不是commit的。
LEO log end offset 这个replica的log里最后一条消息的下一条消息的offset
这些数据根据实际需求,以不同的方式在Kafka中传递:
以外,由于HW是随时变化的,如果即时更新到Zookeeper,会带来效率的问题。而HW是如此重要,因此需要持久化,ReplicaManager就启动了单独的线程定期把所有的partition的HW的值记到文件中,即做highwatermark-checkpoint。
除了leader,ISR之外,在replica系统中还有其它三个对于一致性有重要作用的参数:controller epoch、leader epoch和zookeeper version。
由这三个版本号共同作用,Kafka基本都保证对于leader, ISR, controller的认知在各个broker间不会出现大问题(但是还会有bug和潜在的问题导致认知不一致)。
此外,当broker收到UpdateMetadataRequest时,它会把这个request交给ReplicaManager处理,而ReplicaManager在确定UpdateMetadataRequest的controller epoch有效之后,就会交由MetadataCache来处理。之所以不直接收MetadataCache处理,可能是ReplicaManager处会保存controller epoch, 不过MetadataCache内部也可能获取controller epoch,只是没有做为单独的一个field保存起来。这样做显得有些混乱,不知道是什么原因。
The Producer
负载均衡
1)producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。
2)自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。
异步批量发送
批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。
The Consumer
consumer控制消息的读取。
Push vs Pull
1)producer push data to broker,consumer pull data from broker
2)consumer pull的优点:consumer自己控制消息的读取速度和数量。
3)consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。
Consumer Position
1)大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。
2)Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。
Message Delivery Semantics
三种:
At most once—Messages may be lost but are never redelivered.
At least once—Messages are never lost but may be redelivered.
Exactly once—this is what people actually want, each message is delivered once and only once.
Producer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。
Consumer:
* 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应”At most once“。
* 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。
* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。
Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。
ü controllerEpoch
为了防止先发的请求后到来导致broker数据不一致,所以使用版本管理数据,每次更换controller,epoch加1,所以broker永远只响应本次请求中epoch>=上次请求epoch的请求。
ü leaderEpoch
为了防止先发的请求后到来导致broker数据不一致,所以使用版本管理数据,每次选主更换leader,epoch加1,所以broker永远只响应本次请求中epoch>=上次请求epoch的请求
标签:
原文地址:http://www.cnblogs.com/taek/p/5878666.html