标签:
它实现了topic的分区状态切换功能,Partition存在的状态如下:
状态名 |
状态存在的时间 |
有效的前置状态 |
NonExistentPartition |
1.partition重来没有被创建 2.partition创建之后被删除 |
OfflinePartition
|
NewPartition |
1.partition创建之后,被分配了replicas,但是还没有leader/isr |
NonExistentPartition |
OnlinePartition |
1.partition在replicas中选举某个成为leader之后 |
NewPartition/OfflinePartition |
OfflinePartition |
1.partition的replicas中的leader下线之后,没有重新选举新的leader之前 2.partition创建之后直接被下线 |
NewPartition/OnlinePartition |
Partition状态切换的过程如下:
状态切换 |
切换的时机 |
NonExistentPartition -> NewPartition |
1.从zk上加载assigned replicas置kafkaControl内部的缓存中
|
NewPartition-> OnlinePartition |
1.分配第一个live replica作为leader,其它libe replicas作为isr,并把信息写入到zk |
OnlinePartition,OfflinePartition -> OnlinePartition |
1.为partition重新选举新的leader和isr,并把信息写入到zk
|
NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition |
1.仅仅是在kafkaControl中标记该状态为OfflinePartition |
OfflinePartition -> NonExistentPartition
|
1.仅仅是在kafkaControl中标记该状态为NonExistentPartition |
因此重点关注PartitionStateMachine的handleStateChange函数private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector, callbacks: Callbacks) { val topicAndPartition = TopicAndPartition(topic, partition) if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + "the partition state machine has not started") .format(controllerId, controller.epoch, topicAndPartition, targetState)) val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) try { targetState match { case NewPartition => //检查前置状态 assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) //更新controllerContext中的partitionReplicaAssignment assignReplicasToPartitions(topic, partition) //修改partition的状态 partitionState.put(topicAndPartition, NewPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, assignedReplicas)) case OnlinePartition => //检查前置状态 assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topicAndPartition) match { case NewPartition =>// NewPartition-> OnlinePartition /* 1.根据partitionReplicaAssignment中信息选择第一个live的replica为leader,其余为isr *2.将leader和isr持久化到zk *3.更新controllerContext中的partitionLeadershipInfo *4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 */ initializeLeaderAndIsrForPartition(topicAndPartition) case OfflinePartition =>// OfflinePartition-> OnlinePartition /* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是OfflinePartitionLeaderSelector *2.将leader和isr持久化到zk *3.更新controllerContext中的partitionLeadershipInfo *4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 */ electLeaderForPartition(topic, partition, leaderSelector) case OnlinePartition =>// OnlinePartition -> OnlinePartition /* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是ReassignedPartitionLeaderSelector *2.将leader和isr持久化到zk *3.更新controllerContext中的partitionLeadershipInfo *4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 */ electLeaderForPartition(topic, partition, leaderSelector) case _ => // should never come here since illegal previous states are checked above } //更新partition的状态 partitionState.put(topicAndPartition, OnlinePartition) val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) case OfflinePartition => //检查前置状态 assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) //更新partition的状态 partitionState.put(topicAndPartition, OfflinePartition) case NonExistentPartition => //检查前置状态 assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) //更新partition的状态 partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } } catch { case t: Throwable => stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) } }12.4 KafkaController PartitionLeaderSelector
当partition的状态发生切换时,特别发生如下切换:OfflinePartition-> OnlinePartition和OnlinePartition -> OnlinePartition时需要调用不同的PartitionLeaderSelector来确定leader和isr,当前一共支持5种PartitionLeaderSelector,分别为:NoOpLeaderSelector,OfflinePartitionLeaderSelector,ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector。12.4.1 NoOpLeaderSelector
/** * Essentially does nothing. Returns the current leader and ISR, and the current * set of replicas assigned to a given topic/partition. */ class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[NoOpLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) } }基本上啥也没做,就是把currentLeaderAndIsr和set of replicas assigned to a given topic/partition12.4.2 OfflinePartitionLeaderSelector
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true =>//isr中的broker都离线了,则需要从asr中选择leader if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) } debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) liveAssignedReplicas.isEmpty match { case true =>//如果asr中的broker也都已经离线了,则这个topic/partition挂了 throw new NoReplicaOnlineException(("No replica for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false =>//如果asr中的broker有一些是在线的 ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head//取第一个为leader warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false =>//isr中的broker有一些是在线的 val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) val newLeader = liveReplicasInIsr.head//选择第一个live的replica debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) } } }12.4.3 ReassignedPartitionLeaderSelector
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { //patition被重新分配的replicas val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion //在reassignedInSyncReplicas中筛选replica其所在的broker是live的和当前的replica是位于isr中的 val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)) val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match {//存在满足以上条件的replica,则筛选为leader case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) case None =>//否则reassigned失败 reassignedInSyncReplicas.size match { case 0 => throw new NoReplicaOnlineException("List of reassigned replicas for partition " + " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new NoReplicaOnlineException("None of the reassigned replicas for partition " + "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } }12.4.4 PreferredReplicaPartitionLeaderSelector
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) //默认选举第一个replica作为leader val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) {//如果已经实现,则退出 throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition)) } else { info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election") // 检查这个replica是否位于isr和其所在的broker是否live,如果是的话,则其恢复成leader,此场景主要用于负载均衡的情况 if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas) } else { throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } }12.4.5 ControlledShutdownLeaderSelector
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ControlledShutdownLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val currentLeader = currentLeaderAndIsr.leader val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds //筛选出live状态的replica val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) //筛选出live状态的isr val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) val newLeaderOpt = newIsr.headOption newLeaderOpt match { case Some(newLeader) =>//如果存在newLeader,选择其作为leader debug("Partition %s : current leader = %d, new leader = %d" .format(topicAndPartition, currentLeader, newLeader)) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) } } }12.5 KafkaController ReplicaStateMachine
它实现了topic的partition的replica状态切换功能,replica存在的状态如下:
状态名
状态存在的时间
有效的前置状态
NewReplica1.replica被分配的时候,此时该replica还没有工作,其角色只能是follower
NonExistentReplica
OnlineReplica1.replica开始工作,可能作为leader或者follower
NewReplica/OnlineReplica/ OfflineReplica OfflineReplica1.该replica挂了,比如说该replica所在的broker离线了
NewReplica, OnlineReplica ReplicaDeletionStarted1.开始删除该replica的时候
OfflineReplica ReplicaDeletionSuccessful1.replica成功响应删除该副本的请求的时候 ,此时kafkaControl内存中还保留此replica的信息
ReplicaDeletionStarted ReplicaDeletionIneligible1.如果该replica删除失败
ReplicaDeletionStarted NonExistentReplica1. replica信息被从KafkaControl内存中删除的时候
ReplicaDeletionSuccessfulreplica状态切换的过程如下:
状态切换 切换的时机 NonExistentReplica-> NewReplica 1.KafkaControl 发送LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker NewReplica -> OnlineReplica 1.当KafkaControl按需把new replica加入到asr中的时候,实际上NewReplica转化为OnlineReplica是一个很快的过程,中间存在的时间很短,其转化出现在onNewPartitionCreation OnlineReplica,OfflineReplica-> OnlineReplica 1. KafkaControl 发送 LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica 1. kafkaControl发送StopReplicaRequest to the replica (w/o deletion)
2.kafkaControl 清除 this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.OfflineReplica->ReplicaDeletionStarted 1.kafkaControl发送StopReplicaRequest to the replica ReplicaDeletionStarted->ReplicaDeletionSuccessful 1.kafkaControl mark the state of the replica in the state machine ReplicaDeletionStarted->ReplicaDeletionIneligible 1.kafkaControl mark the state of the replica in the state machine ReplicaDeletionSuccessful-> NonExistentReplica 1.kafkaControl remove the replica from the in memory partition replica assignment cache
因此重点关注ReplicaStateMachine的handleStateChange函数
def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) { val topic = partitionAndReplica.topic val partition = partitionAndReplica.partition val replicaId = partitionAndReplica.replica val topicAndPartition = TopicAndPartition(topic, partition) if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + "to %s failed because replica state machine has not started") .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) try { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) targetState match { case NewReplica =>//当客户端刚创建topic的时候,触发KafkaControl内部的回调onNewPartitionCreation //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) //NewReplica不可能是该Partition的leader,只有online状态才有leader throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") //封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) case None => // new leader request will be sent to this replica when one gets elected } //置状态为NewReplica replicaState.put(partitionAndReplica, NewReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionStarted => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) //置状态为ReplicaDeletionStarted replicaState.put(partitionAndReplica, ReplicaDeletionStarted) //封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理,并且在收到reponse的时候回调TopicDeletionManager中的deleteTopicStopReplicaCallback,将那些成功删除的replica状态切换为ReplicaDeletionSuccessful,将那些删除失败的replica状态切换为ReplicaDeletionIneligible brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionIneligible => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) //置状态为ReplicaDeletionIneligible replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionSuccessful => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) //置状态为ReplicaDeletionIneligible replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case NonExistentReplica => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) //更新partition的分布请求 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) //删除该replica的状态 replicaState.remove(partitionAndReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case OnlineReplica => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) replicaState(partitionAndReplica) match { case NewReplica =>//基本上啥也没做 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if(!currentAssignedReplicas.contains(replicaId))//按需添加replica controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case _ =>//可能之前已经存在,则向其发送leader和isr的request controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) //置状态为OnlineReplica,感觉有点多余 replicaState.put(partitionAndReplica, OnlineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case None => } } //置状态为OnlineReplica replicaState.put(partitionAndReplica, OnlineReplica) case OfflineReplica => //判断前置状态 assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) //封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => //删除该replica controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderIsrAndControllerEpoch) => //此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) } //置状态为OfflineReplica replicaState.put(partitionAndReplica, OfflineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) false case None => true } case None => true } if (leaderAndIsrIsEmpty)//不能没有leader throw new StateChangeFailedException( "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" .format(replicaId, topicAndPartition)) } } catch { case t: Throwable => stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) } }
kafka源码解析之十二KafkaController(中篇)
标签:
原文地址:http://blog.csdn.net/wl044090432/article/details/51119060