标签:移除 ted pac assigned broker 就是 lse offline 初始化
Controller 是从Kafka集群中选取一个的broker,负责管理topic分区和副本的状态的变化,通过上篇我们知道了controller的启动流程,这篇我们学习一下分区和副本状态机。
分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。
PartitionStateMachine 记录着集群所有 Partition 的状态信息,它决定着一个 Partition 处在什么状态以及它在什么状态下可以转变为什么状态,Kafka 中 Partition 的状态总共有以下四种类型:
分区状态机转移图如下所示:
先看下 KafkaController 在启动时,调用 PartitionStateMachine 的 startup()
方法初始化的处理过程。
PartitionStateMachine 的初始化方法如下所示:
在这个方法中,PartitionStateMachine 先调用 initializePartitionState() 方法初始化集群中所有 Partition 的状态信息:
这里只是将 Partition 的状态信息更新分区状态机的缓存 partitionState 中,并没有真正进行状态的转移。
在初始化的第二步,将会调用 triggerOnlinePartitionStateChange()
方法,为所有的状态为 NewPartition/OnlinePartition 的 Partition 进行 leader 选举,选举成功后的话,其状态将会设置为 OnlinePartition。
上面方法的目的是为尝试将所有的状态为 NewPartition/OnlinePartition 的 Partition 状态转移到 OnlinePartition,这个方法主要是做了两件事:
这里以要转移的 TargetState 区分做详细详细讲解,当 TargetState 分别是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 时,副本状态机所做的事情。
NewPartition 是 Partition 刚创建时的一个状态,其处理逻辑如下:
实现逻辑:
l 校验其前置状态,它有效的前置状态为 NonExistentPartition;
l 将该 Partition 的状态转移为 NewPartition 状态,并且更新到缓存中。
OnlinePartition 是一个 Partition 正常工作时的状态,这个状态下的 Partition 已经成功选举出了 leader 和 isr 信息,其实现逻辑如下:
实现逻辑:
对于以上这几种情况,无论前置状态是什么,最后都会触发这个 Partition 的 leader 选举,leader 成功后,都会触发向这个 Partition 的所有 replica 发送 LeaderAndIsr 请求。
OfflinePartition 是这个 Partition 的 leader 挂掉时转移的一个状态,如果 Partition 转移到这个状态,那么就意味着这个 Partition 没有了可用 leader。
实现逻辑:
NonExistentPartition 代表了已经处于 OfflinePartition 状态的 Partition 已经从 metadata 和 zk 中删除后进入的状态。
实现逻辑:
这里主要是看一下上面 Partition 各种转移的触发的条件,整理的结果如下表所示,部分内容会在后续文章讲解。
TargetState |
触发方法 |
作用 |
OnlinePartition |
Controller 的 shutdownBroker() |
优雅关闭 Broker 时调用,因为要下线的节点是 leader,所以需要触发 leader 选举 |
OnlinePartition |
Controller 的 onNewPartitionCreation() |
Partition 新建时,这个是在 Replica 已经变为 NewPartition 状态后进行的,为新建的 Partition 初始化 leader 和 isr |
OnlinePartition |
controller 的 onPreferredReplicaElection() |
对 Partition 进行最优 leader 选举,目的是触发 leader 选举 |
OnlinePartition |
controller 的 moveReassignedPartitionLeaderIfRequired() |
分区副本迁移完成后,1. 当前的 leader 不在 RAR 中,需要触发 leader 选举;2. 当前 leader 在 RAR 但是掉线了,也需要触发 leader 选举 |
OnlinePartition |
PartitionStateMachine 的 triggerOnlinePartitionStateChange() |
当 Controller 重新选举出来或 broker 有变化时,目的为了那些状态为 NewPartition/OfflinePartition 的 Partition 重新选举 leader,选举成功后状态变为 OnlinePartition |
OnlinePartition |
PartitionStateMachine 的 initializePartitionState() |
Controller 初始化时,遍历 zk 的所有的分区,如果有 LeaderAndIsr 信息并且 leader 在 alive broker 上,那么就将状态转为 OnlinePartition。 |
OfflinePartition |
controller 的 onBrokerFailure() |
当有 broker 掉线时,将 leader 在这个机器上的 Partition 设置为 OfflinePartition |
OfflinePartition |
TopicDeletionManager 的 completeDeleteTopic() |
Topic 删除成功后,中间会将该 Partition 的状态先转变为 OfflinePartition |
NonExistentPartition |
TopicDeletionManager 的 completeDeleteTopic() |
Topic 删除成功后,最后会将该 Partition 的状态转移为 NonExistentPartition |
NewPartition |
Controller 的 onNewPartitionCreation() |
Partition 刚创建时的一个中间状态 ,此时还没选举 leader 和设置 isr 信息 |
ReplicaStateMachine 记录着集群所有 Replica 的状态信息,它决定着一个 replica 处在什么状态以及它在什么状态下可以转变为什么状态,Kafka 中副本的状态总共有以下七种类型:
上面的状态中其中后面4是专门为 Replica 删除而服务的,副本状态机转移图如下所示:
在之前介绍KafkaController 在启动时,会调用 ReplicaStateMachine 的 startup()
方法初始化的处理过程。
在这个方法中,ReplicaStateMachine 先调用 initializeReplicaState()
方法初始化集群中所有 Replica 的状态信息,如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica,否则设置为 ReplicaDeletionIneligible 状态,这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState
中,并没有真正进行状态转移的操作。
接着第二步调用 handleStateChanges()
将所有存活的副本状态转移为 OnlineReplica 状态,这里才是真正进行状态转移的地方,其具体实现如下:
这里是副本状态机 startup() 方法的最后一步,它的目的是将所有 alive 的 Replica 状态转移到 OnlineReplica 状态,由于前面已经这些 alive replica 的状态设置成了 OnlineReplica,所以这里 Replica 的状态转移情况是:OnlineReplica –> OnlineReplica,这个方法主要是做了两件事:
NewReplica 这个状态是 Replica 准备开始创建是的一个状态,其实现逻辑如下:
当想要把 Replica 的状态转移为 NewReplica 时,副本状态机的处理逻辑如下:
这是 Replica 开始删除时的状态,Replica 转移到这种状态的处理实现如下:
这部分的实现逻辑:
ReplicaDeletionIneligible 是副本删除失败时的状态,Replica 转移到这种状态的处理实现如下:
实现逻辑:
ReplicaDeletionSuccessful 是副本删除成功时的状态,Replica 转移到这种状态的处理实现如下:
实现逻辑:
NonExistentReplica 是副本完全删除、不存在这个副本的状态,Replica 转移到这种状态的处理实现如下:
实现逻辑:
OnlineReplica 是副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 follower,Replica 转移到这种状态的处理实现如下:
从前面的状态转移图中可以看出,当 Replica 处在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 状态时,Replica 是可以转移到 OnlineReplica 状态的,下面分两种情况讲述:
NewReplica –> OnlineReplica 的处理逻辑如下:
OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的处理逻辑如下:
OfflineReplica 是 Replica 所在 Broker 掉线时 Replica 的状态,转移到这种状态的处理逻辑如下:
处理逻辑如下:
这里主要是看一下上面 Replica 各种转移的触发的条件,整理的结果如下表所示
TargetState |
触发方法 |
作用 |
OnlineReplica |
KafkaController 的 onBrokerStartup() |
Broker 启动时,目的是将在该节点的 Replica 状态设置为 OnlineReplica |
OnlineReplica |
KafkaController 的 onNewPartitionCreation() |
新建 Partition 时,Replica 初始化及 Partition 状态变成 OnlinePartition 后,新创建的 Replica 状态也变为 OnlineReplica; |
OnlineReplica |
KafkaController 的 onPartitionReassignment() |
副本迁移完成后,RAR 中的副本设置为 OnlineReplica 状态 |
OnlineReplica |
ReplicaStateMachine 的 startup() |
副本状态机刚初始化启动时,将存活的副本状态设置为 OnlineReplica |
OfflineReplica |
TopicDeletionManager 的 markTopicForDeletionRetry() |
将删除失败的 Replica 设置为 OfflineReplica,重新进行删除 |
OfflineReplica |
TopicDeletionManager 的 startReplicaDeletion() |
开始副本删除时,先将副本设置为 OfflineReplica |
OfflineReplica |
KafkaController 的 shutdownBroker() 方法 |
优雅关闭 broker 时,目的是把下线节点上的副本状态设置为 OfflineReplica |
OfflineReplica |
KafkaController 的 onBrokerFailure() |
broker 掉线时,目的是把下线节点上的副本状态设置为 OfflineReplica |
NewReplica |
KafkaController 的 onNewPartitionCreation() |
Partition 新建时,当 Partition 状态变为 NewPartition 后,副本的状态变为 NewReplica |
NewReplica |
KafkaController 的 startNewReplicasForReassignedPartition() |
Partition 副本迁移时,将新分配的副本状态设置为 NewReplica; |
ReplicaDeletionStarted |
TopicDeletionManager 的 startReplicaDeletion() |
下线副本时,将成功设置为 OfflineReplica 的 Replica 设置为 ReplicaDeletionStarted 状态,开始物理上删除副本数据(也是发送 StopReplica) |
ReplicaDeletionStarted |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本迁移时,目的是下线那些 old replica,新的 replica 已经迁移到新分配的副本上了 |
ReplicaDeletionSuccessful |
TopicDeletionManager 的 completeReplicaDeletion() |
物理将数据成功删除的 Replica 状态会变为这个 |
ReplicaDeletionSuccessful |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本迁移时,在下线那些旧 Replica 时的一个状态,删除成功 |
ReplicaDeletionIneligible |
TopicDeletionManager 的 startReplicaDeletion() |
开始副本删除时,删除失败的副本会设置成这个状态 |
ReplicaDeletionIneligible |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 副本迁移时,在下线那些旧的 Replica 时的一个状态,删除失败 |
NonExistentReplica |
TopicDeletionManager 的 completeReplicaDeletion() |
副本删除成功后(状态为 ReplicaDeletionSuccessful),从状态机和 Controller 的缓存中清除该副本的记录; |
NonExistentReplica |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本成功迁移、旧副本成功删除后,从状态机和 Controller 的缓存中清除旧副本的记录 |
参考资料:
https://blog.csdn.net/lizhitao/article/details/28108919
https://www.maiyewang.com/?p=7855
http://matt33.com/2018/06/16/controller-state-machine/
标签:移除 ted pac assigned broker 就是 lse offline 初始化
原文地址:https://www.cnblogs.com/zhy-heaven/p/10994193.html