Kafka控制器选举流程剖析

栏目: 编程工具 · 发布时间: 6年前

内容简介:第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;其他的
1.概述

平时在使用Kafka的时候,可能关注的更多的是Kafka系统层面的。今天来给大家剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。

2.内容

Kafka控制器,其实就是一个Kafka系统的Broker。它除了具有一般Broker的功能之外,还具有选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。

如果在Kafka系统运行过程中,当前的控制器出现故障导致不可用,那么Kafka系统会从其他正常运行的Broker中重新选举出新的控制器。

2.1 控制器启动顺序

在Kafka集群中,每个Broker在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点,步骤如下:

第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;

其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;

其他的代理节点,会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。

2.2 如何查看控制器优先级 ?

控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。之后,可以在Zookeeper系统中查看/controller临时节点的内容,例如:

进入Zookeeper集群

[hadoop@dn1 bin]$ zkCli.sh -server dn1:2181

执行查看命令

[zk: dn1:2181(CONNECTED) 1] get /controller

成功执行命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器,如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

当前启动顺序为:dn1、dn2、dn3,修改启动顺序为:dn3、dn1、dn2。再次查看Zookeeper系统中执行“get /controller”命令,输出结果如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

2.3 切换控制器所属的代理节点

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。

3.主题分区Leader节点的选举过程

选举控制器的核心思路是:各个代理节点公平竞争抢占Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。选举流程如下图所示:

Kafka控制器选举流程剖析

图片描述(最多50字)

当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。

KafkaController:在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数,即onControllerFailover和onControllerResignation;

ZookeeperLeaderElector:实现主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现的会话超时”这种情况,它主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑;

LeaderChangeListener:如果节点数据发送变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader函数。当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举,通过调用onResigningAsLeader函数重新选择其他正常运行的代理节点成为新的Leader;

SessionExpirationListener:当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession函数会被调用,对于Zookeeper系统中会话过期的连接,会先进行一次判断。

4.注册分区和副本状态机

Kafka系统的控制器主要负责管理主题、分区和副本。 Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。 使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。

代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。代码如下所示:

def onControllerFailover() {

if(isRunning) {

info("Broker %d starting become controller state

transition".format(config.brokerId))

readControllerEpochFromZookeeper()

incrementControllerEpoch(zkUtils.zkClient)

// 在/brokers/topics节点注册监听器

registerReassignedPartitionsListener()

registerIsrChangeNotificationListener()

registerPreferredReplicaElectionListener()

partitionStateMachine.registerListeners() // 注册分区状态机

replicaStateMachine.registerListeners() // 注册副本状态机

initializeControllerContext()

// 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

replicaStateMachine.startup()                  // 启动副本状态机
  partitionStateMachine.startup()                // 启动分区状态机
  // 在自动故障转移中为所有主题注册分区更改监听器
  controllerContext.allTopics.foreach(topic => partitionStateMachine.
          registerPartitionChangeListener(topic))
  info("Broker %d is ready to serve as the new controller with epoch %d".
          format(config.brokerId, epoch))
  maybeTriggerPartitionReassignment()
  maybeTriggerPreferredReplicaElection()
  if (config.autoLeaderRebalanceEnable) {
    info("starting the partition rebalance scheduler")
    autoRebalanceScheduler.startup()
    autoRebalanceScheduler.schedule("partition-rebalance-thread", 
        checkAndTriggerPartitionRebalance,
          5, 
          config.leaderImbalanceCheckIntervalSeconds.toLong, 
          TimeUnit.SECONDS)
  }
  deleteTopicManager.start()
}
else
  info("Controller has been shut down, aborting startup/failover")

}

主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这就会触发分区和副本状态机注册监听器。

5.总结

Kafka系统整体来说,调试还算方便。下载Kafka源代码,导入到IDE中,就可以启动整个Kafka系统了,可以通过DEBUG的方式来亲自了解控制器的执行流程。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

设计心理学

设计心理学

Donald Norman / 梅琼 / 中信出版社 / 2003-10-1 / 23.00

设计心理学,ISBN:9787800739255,作者:(美)唐纳德·A.·诺曼(Donald A. Norman)著;梅琼译;梅琼译一起来看看 《设计心理学》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具