码迷,mamicode.com
首页 > 其他好文 > 详细

kafka 心跳和 reblance

时间:2019-01-17 15:17:01      阅读:1337      评论:0      收藏:0      [点我收藏+]

标签:for   dup   rtb   ddl   esc   心跳线   ext   session   apach   

kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳。

consumer 和 reblance 相关的 2 个配置参数:

参数名                --> MemberMetadata 字段
session.timeout.ms   --> MemberMetadata.sessionTimeoutMs
max.poll.interval.ms --> MemberMetadata.rebalanceTimeoutMs

 

broker 端,sessionTimeoutMs 参数

broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 reblance。

 1   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
 2     // complete current heartbeat expectation
 3     member.latestHeartbeat = time.milliseconds()
 4     val memberKey = MemberKey(member.groupId, member.memberId)
 5     heartbeatPurgatory.checkAndComplete(memberKey)
 6 
 7     // reschedule the next heartbeat expiration deadline
 8     // 计算心跳截止时刻
 9     val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
10     val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
11     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
12   } 
13   
14   // 心跳过期
15   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
16     group.inLock {
17       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
18         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
19         removeMemberAndUpdateGroup(group, member)
20       }
21     }
22   }
23   
24   private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
25     member.awaitingJoinCallback != null ||
26       member.awaitingSyncCallback != null ||
27       member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

 

consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数

如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 reblance

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 代码片段:

if (coordinatorUnknown()) {
    if (findCoordinatorFuture != null || lookupCoordinator().failed())
        // the immediate future check ensures that we backoff properly in the case that no
        // brokers are available to connect to.
        AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
    // the session timeout has expired without seeing a successful heartbeat, so we should
    // probably make sure the coordinator is still healthy.
    markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
    // the poll timeout has expired, which means that the foreground thread has stalled
    // in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
    // poll again after waiting for the retry backoff in case the heartbeat failed or the
    // coordinator disconnected
    AbstractCoordinator.this.wait(retryBackoffMs);
} else {
    heartbeat.sentHeartbeat(now);

    sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            synchronized (AbstractCoordinator.this) {
                heartbeat.receiveHeartbeat(time.milliseconds());
            }
        }

        @Override
        public void onFailure(RuntimeException e) {
            synchronized (AbstractCoordinator.this) {
                if (e instanceof RebalanceInProgressException) {
                    // it is valid to continue heartbeating while the group is rebalancing. This
                    // ensures that the coordinator keeps the member in the group for as long
                    // as the duration of the rebalance timeout. If we stop sending heartbeats,
                    // however, then the session timeout may expire before we can rejoin.
                    heartbeat.receiveHeartbeat(time.milliseconds());
                } else {
                    heartbeat.failHeartbeat();

                    // wake up the thread if it‘s sleeping to reschedule the heartbeat
                    AbstractCoordinator.this.notify();
                }
            }
        }
    });
}

 

org.apache.kafka.clients.consumer.internals.Heartbeat#pollTimeoutExpired:

//maxPollInterval 即 rebalanceTimeoutMs 
public boolean pollTimeoutExpired(long now) {
    return now - lastPoll > maxPollInterval;
}

 

kafka 心跳和 reblance

标签:for   dup   rtb   ddl   esc   心跳线   ext   session   apach   

原文地址:https://www.cnblogs.com/allenwas3/p/10278998.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!