Kafka控制器选举流程剖析

栏目: 编程工具 · 发布时间: 7年前

内容简介:第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;其他的
1.概述

平时在使用Kafka的时候,可能关注的更多的是Kafka系统层面的。今天来给大家剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。

2.内容

Kafka控制器,其实就是一个Kafka系统的Broker。它除了具有一般Broker的功能之外,还具有选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。

如果在Kafka系统运行过程中,当前的控制器出现故障导致不可用,那么Kafka系统会从其他正常运行的Broker中重新选举出新的控制器。

2.1 控制器启动顺序

在Kafka集群中,每个Broker在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点,步骤如下:

第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;

其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;

其他的代理节点,会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。

2.2 如何查看控制器优先级 ?

控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。之后,可以在Zookeeper系统中查看/controller临时节点的内容,例如:

进入Zookeeper集群

[hadoop@dn1 bin]$ zkCli.sh -server dn1:2181

执行查看命令

[zk: dn1:2181(CONNECTED) 1] get /controller

成功执行命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器,如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

当前启动顺序为:dn1、dn2、dn3,修改启动顺序为:dn3、dn1、dn2。再次查看Zookeeper系统中执行“get /controller”命令,输出结果如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

2.3 切换控制器所属的代理节点

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。

3.主题分区Leader节点的选举过程

选举控制器的核心思路是:各个代理节点公平竞争抢占Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。选举流程如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。

KafkaController:在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数,即onControllerFailover和onControllerResignation;

ZookeeperLeaderElector:实现主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现的会话超时”这种情况,它主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑;

LeaderChangeListener:如果节点数据发送变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader函数。当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举,通过调用onResigningAsLeader函数重新选择其他正常运行的代理节点成为新的Leader;

SessionExpirationListener:当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession函数会被调用,对于Zookeeper系统中会话过期的连接,会先进行一次判断。

4.注册分区和副本状态机

Kafka系统的控制器主要负责管理主题、分区和副本。 Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。 使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。

代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。代码如下所示:

def onControllerFailover() {

if(isRunning) {

info("Broker %d starting become controller state

transition".format(config.brokerId))

readControllerEpochFromZookeeper()

incrementControllerEpoch(zkUtils.zkClient)

// 在/brokers/topics节点注册监听器

registerReassignedPartitionsListener()

registerIsrChangeNotificationListener()

registerPreferredReplicaElectionListener()

partitionStateMachine.registerListeners() // 注册分区状态机

replicaStateMachine.registerListeners() // 注册副本状态机

initializeControllerContext()

// 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

replicaStateMachine.startup()                  // 启动副本状态机
  partitionStateMachine.startup()                // 启动分区状态机
  // 在自动故障转移中为所有主题注册分区更改监听器
  controllerContext.allTopics.foreach(topic => partitionStateMachine.
          registerPartitionChangeListener(topic))
  info("Broker %d is ready to serve as the new controller with epoch %d".
          format(config.brokerId, epoch))
  maybeTriggerPartitionReassignment()
  maybeTriggerPreferredReplicaElection()
  if (config.autoLeaderRebalanceEnable) {
    info("starting the partition rebalance scheduler")
    autoRebalanceScheduler.startup()
    autoRebalanceScheduler.schedule("partition-rebalance-thread", 
        checkAndTriggerPartitionRebalance,
          5, 
          config.leaderImbalanceCheckIntervalSeconds.toLong, 
          TimeUnit.SECONDS)
  }
  deleteTopicManager.start()
}
else
  info("Controller has been shut down, aborting startup/failover")

}

主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这就会触发分区和副本状态机注册监听器。

5.总结

Kafka系统整体来说,调试还算方便。下载Kafka源代码,导入到IDE中,就可以启动整个Kafka系统了,可以通过DEBUG的方式来亲自了解控制器的执行流程。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Data Mining

Web Data Mining

Bing Liu / Springer / 2006-12-28 / USD 59.95

Web mining aims to discover useful information and knowledge from the Web hyperlink structure, page contents, and usage data. Although Web mining uses many conventional data mining techniques, it is n......一起来看看 《Web Data Mining》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具