标签:
(基于0.10版本)
Kafka的coordiantor要做的事情就是group management,就是要对一个团队(或者叫组)的成员进行管理。Group management就是要做这些事情:
Kafka为其设计了一个协议,就收做Group Management Protocol.
很明显,consumer group所要做的事情,是可以用group management 协议做到的。而cooridnator, 及这个协议,也是为了实现不依赖Zookeeper的高级消费者而提出并实现的。只不过,Kafka对高级消费者的成员管理行为进行了抽象,抽象出来group management功能共有的逻辑,以此设计了Group Management Protocol, 使得这个协议不只适用于Kafka consumer(目前Kafka Connect也在用它),也可以作为其它"group"的管理协议。
那么,这个协议抽象出来了哪些group management共有的逻辑呢? Kafka Consumer的AbstractCoordinator的注释给出了一些答案。
AbstractCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.See ConsumerCoordinator for example usage.
From a high level, Kafka‘s group management protocol consists of the following sequence of actions:
- Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
- Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
- State Assignment: The leader collects the metadata from all the members of the group and assigns state.
- Group Stabilization: Each member receives the state assigned by the leader and begins processing.
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).
首先,AbstractorCoordinator是位于broker端的coordinator的客户端。这段注释里的"The cooridnator"都是指broker端的那个cooridnator,而不是AbstractCoordiantor。AbstractCoordinator和broker端的coordinator的分工,可以从注释里大致看出来。这段注释说,Kafka的group management protocol包括以下的动作序列:
这里边有三个角色:coordinator, group memeber, group leader.
有这么几个情况:
要回答这些问题,就要看代码了。AbstractCoordinator的注释还没完,它接下来这么说:
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).
这是说AbstractCoordinator的实现必须实现三个方法: metadata(), performAssignment(String, String, Map)和onJoinComplete(int, String, String, ByteBuffer)。
从这三个方法入手,可以了解Group Management Protocol的一些细节。
metadata()
protected abstract List<ProtocolMetadata> metadata();Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
Returns:
Non-empty map of supported protocols and metadata
这个方法返回的是这个group member所支持的协议,以及适用于生个协议的protocol。这些数据会提交给coordinator,coordinator会考虑到所有成员支持的协议,来为它们选择一个通用的协议。
下面看一下ConsumerCoordinator对它的实现:
@Override public List<ProtocolMetadata> metadata() { List<ProtocolMetadata> metadataList = new ArrayList<>(); for (PartitionAssignor assignor : assignors) { Subscription subscription = assignor.subscription(subscriptions.subscription()); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); metadataList.add(new ProtocolMetadata(assignor.name(), metadata)); } return metadataList; }
在这里,consumer提供给每个协议的metadata都是一样的,是Subscription对象包含的数据。Subscription是PartitionAssignor的一个内部类,它有两个field
class Subscription { private final List<String> topics; private final ByteBuffer userData; ... }
也就是说,consumer提供给coordinator的有两部分信息:1. 它订阅了哪些topic 2. userData。对于consumer, userData实际上是一个空数组。不过PartitionAssignor这么定义Subscription是有其用意的,userData是干啥的呢?再看一下PartitionAssgnor的注释。这也有助于了解ConsumerCoordinator#metadata()方法时使用的assignors是哪来的。
This interface is used to define custom partition assignment for use in org.apache.kafka.clients.consumer.KafkaConsumer. Members of the consumer group subscribe to the topics they are interested in and forward their subscriptions to a Kafka broker serving as the group coordinator. The coordinator selects one member to perform the group assignment and propagates the subscriptions of all members to it. Then assign(Cluster, Map) is called to perform the assignment and the results are forwarded back to each respective members. In some cases, it is useful to forward additional metadata to the assignor in order to make assignment decisions. For this, you can override subscription(Set) and provide custom userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation can use this user data to forward the rackId belonging to each member.
这段注释也回答了一些之前在分析AbstractCoordinator的注释时提出的问题。这段注释提供了以下几点信息
俺觉得,某些资源调度框架可能会受益于自定的PartitionAssignor,除了rack-aware之外,它们还可以根据每个机器上分配的consumer个数以及机器的性能来更好地进行负载匀衡。而且,这个东东也可以用来实现partition分配的“粘性”,即某个consumer可以一直被分配特定的分区,以便于它维持本地的状态。
protected abstract Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata)
Perform assignment for the group. This is used by the leader to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)
Parameters:
leaderId - The id of the leader (which is this member)
allMemberMetadata - Metadata from all members of the group
Returns:
A map from each member to their state assignment
这里leader Id, allMemeberMetadata都是Coordinator通过JoinGroupRespone发给leader的。leader基于这些信息做出分配,然后把分配结果写在SyncGroupRequest里发回给cooridnator,由Cooridnator把每个member被分配的状态发给这个member。
下面来看一下ConsumerCooridnator对这个方法的实现:
@Override protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) { //根据coordinator选择的协议确定PartitionAssignor PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); //确定当前group订阅的所有topic,以及每个member订阅了哪些topic Set<String> allSubscribedTopics = new HashSet<>(); Map<String, Subscription> subscriptions = new HashMap<>(); for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) { Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue()); subscriptions.put(subscriptionEntry.getKey(), subscription); allSubscribedTopics.addAll(subscription.topics()); } // the leader will begin watching for changes to any of the topics the group is interested in, // which ensures that all metadata changes will eventually be seen //leader会监听这个group订部的所有topic的metadata的变化 this.subscriptions.groupSubscribe(allSubscribedTopics); metadata.setTopics(this.subscriptions.groupSubscription()); // update metadata (if needed) and keep track of the metadata used for assignment so that // we can check after rebalance completion whether anything has changed //根据需要更新metadata,并且记录assign时用的metadata到assignmentSnapshot里 client.ensureFreshMetadata(); assignmentSnapshot = metadataSnapshot; log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", groupId, assignor.name(), subscriptions); //执行分配。metadata.fetch会获得当前的metadata,由于KafkaConsumer是单线程的,所以这里fetch的metadata和前边保存的是一致的 Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); log.debug("Finished assignment for group {}: {}", groupId, assignment); //生成groupAssignment。它指明了哪个group member该消费哪个TopicPartition Map<String, ByteBuffer> groupAssignment = new HashMap<>(); for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); groupAssignment.put(assignmentEntry.getKey(), buffer); } return groupAssignment; }
这里的Assignor有两种: RangeAssignor和RoundRobinAssignor。
两者都是把一个Topic的分区依次分给所有订阅这个topic的consumer.以t表示topic, c表示consumer,p表示partition, 字母后边的数字表示topic, partiton, consumer的id。
RangeAssignor与RoundRobinAssignor的区别在于对于一个topic的分区的分配,是否会受到其它topic分区分配的影响。
RangeAssignor
RangeAssignor对于每个topic,都是从consumer0开始分配。比如,topic0有3个分区,订阅它的有两个consumer。那么consumer0会分到t0p0和t0p1, 而consumer1会分到t0p2.
如果它两个consumer也都订阅了另一个有三个分区的topic1, 那么consumer0还会分到t1p0和t1p1,而consumer1会分到t1p2。具体的算法RangeAssignor的JavaDoc有描述。
可见RangeAssignor有某些情况下是不公平的,在上边的例子中,如果这两个consumer订阅了更多有三个分区的topic,那么consumer0分配的partition数量会一直是consumer1的两倍。
RoundRobinAssignor
RoundRobinAssignor会首先把这个group订阅的所有TopicPartition排序,排序是先按topic排序,同一个topic的分区按partition id排序。具体的算法RoundRobinAssignor的JavaDoc有描述。比如,假如有两个各有三个分区的topic,它们的TopicPartition排序后为t0p0 t0p1 t0p2 t1p0 t1p1 t1p2。
分配时会把这个排序后的TopicPartition列表依次分配给订阅它们的consumer。比如c0和c1都订阅了这两个topic, 那么分配结果是
t0p0 | t0p1 | t0p2 | t1p0 | t1p1 | t1p2 |
c0 | c1 | c0 | c1 | c0 | c1 |
这样c0分到了: t0p0, t0p2, t1p2. c1分到了: t0p1, t1p0, t1p2
如果有三个consumer,
c0订阅了t0, t1, t3.
c1订阅了t0, t2, t4。
c2订阅了t0, t2, t4。
t0有两个分区,而其它topic都只有一个分区。
那么排序后的TopicPartition以及分配的结果为
t0p0 | t0p1 | t1p0 | t2p0 | t3p0 | t4p0 |
c0 | c1 | c0 | c1 | c0 | c1 |
可见c3干脆就分不到分区了。所以RoundRobinAssignor也不能保证绝对公平。不过这只是比较极端的例子。
/** * Invoked when a group member has successfully joined a group. * @param generation The generation that was joined * @param memberId The identifier for the local member in the group * @param protocol The protocol selected by the coordinator * @param memberAssignment The assignment propagated from the group leader */ protected abstract void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment);
ConsumerCoordinator对它的实现是:
@Override protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { // if we were the assignor, then we need to make sure that there have been no metadata updates // since the rebalance begin. Otherwise, we won‘t rebalance again until the next metadata change if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) { subscriptions.needReassignment(); return; } PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // set the flag to refresh last committed offsets subscriptions.needRefreshCommits(); // update partition assignment subscriptions.assignFromSubscribed(assignment.partitions()); // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); // reschedule the auto commit starting from now if (autoCommitEnabled) autoCommitTask.reschedule(); // execute the user‘s callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException e) { throw e; } catch (Exception e) { log.error("User provided listener {} for group {} failed on partition assignment", listener.getClass().getName(), groupId, e); } }
首先,对于leader来说,它要检查一下进行分配时的metadata跟当前的metadata是否一致,不一致的话,就标记下需要重新协调一次assign.
如果不存在上边的情况,就做以下几个事情:
这里需要注意的是,所有KafkaConsumer的操作都是在一个线程完成的,而且大部分都是在poll这个方法调用中完成。所以上边代码中的
subscriptions.needReassignment()和subscriptions.needRefreshCommits()
这些方法,都是改变了subscription对象的状态,并没有直正执行reassign和refresh commit操作。KafkaConsumer在执行poll方法时,会检查这subscription对象的状态,然后执行所需要的操作。所以,代码里这两句
// set the flag to refresh last committed offsets subscriptions.needRefreshCommits(); // update partition assignment subscriptions.assignFromSubscribed(assignment.partitions());
当freshCommit执行时,第二句assignFromSubscribed已经执行完了,所以是获取分配给这个consumer的所有partition的last committed offset.
Kafka Cooridnator的具体行为,可以参照这篇wiki。
标签:
原文地址:http://www.cnblogs.com/devos/p/5656232.html