jerry · 2019-05-10 16:23:54 · 热度: 202

kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳。

consumer 和 reblance 相关的 2 个配置参数:

参数名                --> MemberMetadata 字段
session.timeout.ms   --> MemberMetadata.sessionTimeoutMs
max.poll.interval.ms --> MemberMetadata.rebalanceTimeoutMs

broker 端,sessionTimeoutMs 参数

broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 reblance。

private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)

    // reschedule the next heartbeat expiration deadline
    // 计算心跳截止时刻
    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))

  // 心跳过期
  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group.inLock {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        removeMemberAndUpdateGroup(group, member)

  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
    member.awaitingJoinCallback != null ||
      member.awaitingSyncCallback != null ||
      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数

如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 reblance

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 代码片段:

if (coordinatorUnknown()) {
    if (findCoordinatorFuture != null || lookupCoordinator().failed())
        // the immediate future check ensures that we backoff properly in the case that no
        // brokers are available to connect to.
} else if (heartbeat.sessionTimeoutExpired(now)) {
    // the session timeout has expired without seeing a successful heartbeat, so we should
    // probably make sure the coordinator is still healthy.
} else if (heartbeat.pollTimeoutExpired(now)) {
    // the poll timeout has expired, which means that the foreground thread has stalled
    // in between calls to poll(), so we explicitly leave the group.
} else if (!heartbeat.shouldHeartbeat(now)) {
    // poll again after waiting for the retry backoff in case the heartbeat failed or the
    // coordinator disconnected
} else {

    sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
        public void onSuccess(Void value) {
            synchronized (AbstractCoordinator.this) {

        public void onFailure(RuntimeException e) {
            synchronized (AbstractCoordinator.this) {
                if (e instanceof RebalanceInProgressException) {
                    // it is valid to continue heartbeating while the group is rebalancing. This
                    // ensures that the coordinator keeps the member in the group for as long
                    // as the duration of the rebalance timeout. If we stop sending heartbeats,
                    // however, then the session timeout may expire before we can rejoin.
                } else {

                    // wake up the thread if it's sleeping to reschedule the heartbeat


//maxPollInterval 即 rebalanceTimeoutMs 
public boolean pollTimeoutExpired(long now) {
    return now - lastPoll > maxPollInterval;

