标签:time 线程 通信 request请求 位置 选择 避免 技术 sync
假设某 topic 有4个分区,消费者组中只有一个消费者,那么这个消费者将消费全部 partition 中的数据。
如果消费者组中有两个消费者,那么每个消费者消费两个 partition。
如果消费者组中有4个消费者,那么每个消费者消费一个partition。
如果消费者组中有5个消费者,那么有一个消费者就是空闲的。
注意:在同一个消费者组中,不要让消费者的数量大于分区的数量
多个消费者组之间不会互相影响。
在 kafka-0.10 版本,Kafka 在服务端引入了组协调器(GroupCoordinator),每个 Kafka Server 启动时都会创建一个 GroupCoordinator 实例,用于管理部分消费者组和该消费者组下的每个消费者的消费偏移量。
在客户端引入了消费者协调器(ConsumerCoordinator),实例化一个消费者就会实例化一个 ConsumerCoordinator 对象,ConsumerCoordinator 负责同一个消费者组下各消费者与服务端的 GroupCoordinator 进行通信。
ConsumerCoordinator 定义的位置:
public class KafkaConsumer<K, V> implements Consumer<K, V> { private final ConsumerCoordinator coordinator; }
ConsumerCoordinator 是 KafkaConsumer 的一个私有的成员变量,因此 ConsumerCoordinator 中存储的信息也只有与之对应的消费者可见,不同消费者之间是看不到彼此的 ConsumerCoordinator 中的信息的
ConsumerCoordinator 的作用:
ConsumerCoordinator 实现上述功能的组件是 ConsumerCoordinator 类的私有成员或者是其父类的私有成员:
1 public final class ConsumerCoordinator extends AbstractCoordinator { 2 private final List<PartitionAssignor> assignors; 3 private final OffsetCommitCallback defaultOffsetCommitCallback; 4 private final SubscriptionState subscriptions; 5 private final ConsumerInterceptors<?, ?> interceptors; 6 private boolean isLeader = false; 7 private MetadataSnapshot metadataSnapshot; 8 private MetadataSnapshot assignmentSnapshot; 9 10 省略了部分代码.... 11 } 12 13 14 public abstract class AbstractCoordinator implements Closeable { 15 private enum MemberState { 16 UNJOINED, // the client is not part of a group 17 REBALANCING, // the client has begun rebalancing 18 STABLE, // the client has joined and is sending heartbeats 19 } 20 21 private final Heartbeat heartbeat; 22 protected final ConsumerNetworkClient client; 23 private HeartbeatThread heartbeatThread = null; 24 private MemberState state = MemberState.UNJOINED; 25 private RequestFuture<ByteBuffer> joinFuture = null; 26 27 省略了部分代码.... 28 }
GroupCoordinator 的作用:
GroupCoordinator 依赖的组件及其作用
消费者协调器通过和组协调器发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询获取消息或提交偏移量时发送心跳。
如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为它已经死亡,就会触发一次再均衡。
在 0.10 版本里,心跳任务由一个独立的心跳线程来执行,可以在轮询获取消息的空档发送心跳。这样一来,发送心跳的频率(也就是组协调器群检测消费者运行状态的时间)与消息轮询的频率(由处理消息所花费的时间来确定)之间就是相互独立的。在0.10 版本的 Kafka 里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livelock),比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配置与 session.timeout.ms 是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。
发生分区再均衡的3种情况:
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡。
再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为leader消费者。leader消费者从组协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。
每个消费者的消费者协调器在向组协调器请求加入组时,都会把自己支持的分区分配策略报告给组协调器(轮询或者是按跨度分配或者其他),组协调器选出该消费组下所有消费者都支持的的分区分配策略发送给leader消费者,leader消费者根据这个分区分配策略进行分配。
完毕之后,leader消费者把分配情况列表发送给组协调器,消费者协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有leader消费者知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
标签:time 线程 通信 request请求 位置 选择 避免 技术 sync
原文地址:https://www.cnblogs.com/hyunbar/p/12527014.html