Kafka 源码系列之 topic 创建分区分配及 leader 选举

本文讲解依然是基于 kafka 源码 。假如阅读过前面的文章应该知道,用户的 admin 指令都是通过 Zookeeper 发布给 kafka Controller ,然后由 Controller 发布给具体的 Broker

Topic 的创建过程亦是如此。本文主要是关注一下几点:

1 ,分区和副本是在何处,以怎样的方式分配给 Broker

2 kafka Controller 接收到 Zookeeper 的通知后做了哪些处理。

3 ,分区的 leader follower 是如何选举的。


1 TopicCommand

Topic 相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向 Zookeeper 发布相关策略的。

2 KafkaApis

业务处理线程要使用的对象,其 handle 方法相当于将各种请求,交由相应的处理函数进行处理。

3 KafkaController

KafkaController 作为 kafka 集群的控制者,有且存在一个 leader ,若干个 follower Leader 能够发送具体的指令给 follower ,具体指令如: RequestKeys.LeaderAndIsrKey RequestKeys.StopReplicaKey RequestKeys.UpdateMetadataKey

4 PartitionStateMachine


NonExistentPartition :不存在。该状态的前状态假如有的话,只能是 OfflinePartition

NewPartition: 分区创建后的状态,前状态是 NonExistentPartition 。改状态说明分区已经有副本且不存在 leader/isr

OnlinePartition: 选举过 leader 后,处于该状态,前状态可是 :OfflinePartition/NewPartition

OfflinePartition: 选举过 leader 以后, leader 挂掉,分区就会处于当前状态,前状态可能是 NewPartition/OnlinePartition



A) ,command 创建时 Partition 均匀分布于 Broker 的策略


1 ,尽可能将副本均匀分配到 Broker

2 ,每个分区的副本都分配到不同的 Broker

为了实现这个目标 kafka 采取下面两个策略:

1 ,随机选取一个 Broker 位置作为分配 Partition 的起始位置,将 Partition 的第一个副本进行轮询分配

2 ,将其它副本以一个递增的位移分配到不同的 Broker 上去



if (opts. options .has(opts. createOpt ))
   createTopic(zkClient , opts)

AdminUtils . createTopic (zkClient , topic , partitions , replicas , configs)

进行 partition Replicas 的均匀分配

val replicaAssignment = AdminUtils. assignReplicasToBrokers (brokerList , partitions , replicationFactor)


val   ret = new mutable.HashMap[Int , List[Int]]()
val startIndex = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
var currentPartitionId = if (startPartitionId >= 0 ) startPartitionId else 0

var nextReplicaShift = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
   if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0 ))
    nextReplicaShift += 1
   val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
   var replicaList = List (brokerList(firstReplicaIndex))
   for (j <- 0 until replicationFactor - 1 )
     //计算出每个副本的位置 计算方法是replicaIndex:
    //val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)
    //(firstReplicaIndex + shift) %  brokerList.size
     replicaList ::= brokerList( replicaIndex (firstReplicaIndex , nextReplicaShift , j , brokerList.size))
  ret.put(currentPartitionId , replicaList.reverse)
   currentPartitionId = currentPartitionId + 1

将配置和分配策略写到 Zookeeper 上去

AdminUtils. createOrUpdateTopicPartitionAssignmentPathInZK (zkClient , topic , replicaAssignment , topicConfig)


写配置, Zookeeper 的目录是: /config/topics/TopicName

writeTopicConfig (zkClient , topic , config)

写分配策略, Zookeeper 的目录是: /brokers/topics/TopicName

writeTopicPartitionAssignment (zkClient , topic , partitionReplicaAssignment , update)

B) ,kafka Controller 监听到 topic 创建事件后的处理

KafkaController PartitionStateMachine 对象内部有一个 Zookeeper listener 专门监听新增 topic 事件。 TopicChangeListener

获取新增 topic

val newTopics = currentChildren -- controllerContext . allTopics

获取分区副本分配策略 HashMap[TopicAndPartition, Seq[Int]]

val   addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient , newTopics.toSeq)


if (newTopics.size > 0 )
   controller.onNewTopicCreation(newTopics , addedPartitionReplicaAssignment.keySet.toSet)

订阅新增 topic 的分区变动事件

// subscribe to partition changes 注册指定topic的分区变动事件监听器
topics.foreach(topic => partitionStateMachine .registerPartitionChangeListener(topic))

处理新增分区 onNewPartitionCreation


1 将新建分区的状态转化为 NewPartition 状态

partitionStateMachine .handleStateChanges(newPartitions , NewPartition)


partitions .foreach { topicAndPartition =>
  handleStateChange(topicAndPartition.topic , topicAndPartition.partition , targetState , leaderSelector , callbacks)

case   NewPartition =>
   //指定TopicAndPartition 获取副本
   assignReplicasToPartitions(topic , partition)
   partitionState .put(topicAndPartition , NewPartition)
   val assignedReplicas = controllerContext . partitionReplicaAssignment (topicAndPartition).mkString( "," )

AssgnReplicasToPartition 方法的具体内容,主要是先获取分区所在的 Brokerid 序列,然后

val assignedReplicas = ZkUtils. getReplicasForPartition ( controllerContext .zkClient , topic , partition)
controllerContext . partitionReplicaAssignment += TopicAndPartition (topic , partition) -> assignedReplicas

2 将新建分区的状态从 NewPartition OnlinePartition 状态

partitionStateMachine .handleStateChanges(newPartitions , OnlinePartition , offlinePartitionSelector )

handleStateChange ,中具体处理是

case   OnlinePartition =>
  assertValidPreviousStates(topicAndPartition , List (NewPartition , OnlinePartition , OfflinePartition) , OnlinePartition)
   partitionState (topicAndPartition) match {
     case NewPartition =>
       // initialize leader and isr path for new partition

initializeLeaderAndIsrForPartition. 第一个 seq 中的 Broker 当做 leader

val   leader = liveAssignedReplicas.head   //第一个副本作为leader
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch( new LeaderAndIsr(leader , liveAssignedReplicas.toList) ,


[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state
//        {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}
           ZkUtils. createPersistentPath ( controllerContext .zkClient ,
             ZkUtils. getTopicPartitionLeaderAndIsrPath (topicAndPartition.topic , topicAndPartition.partition) ,
             ZkUtils. leaderAndIsrZkData (leaderIsrAndControllerEpoch.leaderAndIsr , controller.epoch))

topic 分区 副本 放入 leaderAndIsrRequestMap ,以便我们可以通过 Brokerid 找到

brokerRequestBatch .addLeaderAndIsrRequestForBrokers(liveAssignedReplicas , topicAndPartition.topic ,
   topicAndPartition.partition , leaderIsrAndControllerEpoch , replicaAssignment)

发信息给需要的 BrokerID

leaderAndIsrRequestMap .foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = controllerContext .liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
       val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos , leaders , controllerId , controllerEpoch , correlationId , clientId )
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
         stateChangeLogger .trace(( "Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
                                  "for partition [%s,%d]" ).format( controllerId , controllerEpoch , typeOfRequest ,
                                                                  p._2.leaderIsrAndControllerEpoch , correlationId , broker ,
                                                                  p._1._1 , p._1._2))
//      给具体的Broker发送LeaderAndIsrRequest
       controller.sendRequest(broker , leaderAndIsrRequest , null )

C) , Broker leader follower 的产生过程

Broker 接收到 Controller LeaderAndIsrRequest 消息后,交由 kafkaApis handle 处理

case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

当前 Broker 成为副本的 leader 或者 follower 的入口函数

val (response , error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest , offsetManager)

当前 Broker 能不能成为 Broker ,取决于 Brokerid 是否与 leader 分配的 Brokerid 一致,一致就会成为 leader ,否则 follower

val partitionsTobeLeader = partitionState
  .filter{ case (partition , partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config. brokerId }
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

真正的进入 leader 或者 follower 的过程

if   (!partitionsTobeLeader.isEmpty)
  makeLeaders(controllerId , controllerEpoch , partitionsTobeLeader , leaderAndISRRequest.correlationId , responseMap , offsetManager)
if (!partitionsToBeFollower.isEmpty)
  makeFollowers(controllerId , controllerEpoch , partitionsToBeFollower , leaderAndISRRequest.leaders , leaderAndISRRequest.correlationId , responseMap , offsetManager)

在接收到第一个 leaderisrrequest 后初始化 highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动 Checkpointing 之前。

if   (! hwThreadInitialized ) {
   hwThreadInitialized = true

下面具体讲解 makeleaders makeFollowers 方法

使当前 Broker 成为给定分区的 leader ,需要做以下几个处理:

* 1 ,停止掉这些分区的 fetchers

* 2 ,更新缓存的当前分区的元数据

* 3 ,将分区加入 leader 分区集合

// First stop fetchers for all the partitions
replicaFetcherManager .removeFetcherForPartitions(partitionState.keySet.map( new TopicAndPartition(_)))

// Update the partition information to be the leader
partitionState.foreach{ case (partition , partitionStateInfo) =>
  partition.makeLeader(controllerId , partitionStateInfo , correlationId , offsetManager)}

Makeleader 方法具体的操作了一个副本成为 leader 的过程:


*   记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代

*   增加新的副本

*   移除已经被 Controller 移除的已分配副本

*   为新的 leader 副本构建高水位元数据

*   为远程副本重置 logendoffset

*   由于 isr 可能将为 1 ,我们需要增加高水位


def makeLeader (controllerId: Int,
                partitionStateInfo: PartitionStateInfo , correlationId: Int,
                offsetManager: OffsetManager): Boolean = {
   inWriteLock ( leaderIsrUpdateLock ) {
     val allReplicas = partitionStateInfo.allReplicas
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
     // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
    // to maintain the decision maker controller's epoch in the zookeeper path
     controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
     // add replicas that are new
     allReplicas.foreach(replica => getOrCreateReplica(replica))
     val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
     // remove assigned replicas that have been removed by the controller
     (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
     inSyncReplicas = newInSyncReplicas
     leaderEpoch = leaderAndIsr.leaderEpoch
     zkVersion = leaderAndIsr.zkVersion
     leaderReplicaIdOpt = Some ( localBrokerId )
     // construct the high watermark metadata for the new leader replica
     val newLeaderReplica = getReplica().get
     // reset log end offset for remote replicas
     assignedReplicas.foreach(r => if (r.brokerId != localBrokerId ) r.logEndOffset = LogOffsetMetadata. UnknownOffsetMetadata )
     // we may need to increment high watermark since ISR could be down to 1
     if (topic == OffsetManager. OffsetsTopicName )

当前 Broker 成为给定分区的 follower 要做要做以下几个处理:

* 1 ,将分区从 leader partition 集合中移除

* 2 ,将副本标记为 follower ,目的是不让生产者继续往该副本生产消息

* 3 ,停止掉该分区的所有 fetcher ,目的是不让副本 fetcher 线程往该副本写数据。

* 4 ,清空当前分区的 log Checkpoint offsets

* 5 ,假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程


将分区从 leader partition 集合中移除

将副本标记为 follower ,目的是不让生产者继续往该副本生产消息

partitionState.foreach{ case ( partition , partitionStateInfo) =>
   val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
   val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
  leaders.find(_.id == newLeaderBrokerId) match {
     // Only change partition state when the leader is available
     case Some (leaderBroker) =>
       if ( partition .makeFollower(controllerId , partitionStateInfo , correlationId , offsetManager))
        partitionsToMakeFollower += partition

当前分区的 log Checkpoint offsets

replicaFetcherManager .removeFetcherForPartitions( partitionsToMakeFollower .map( new TopicAndPartition(_)))

清空当前分区的 log Checkpoint offsets

logManager .truncateTo(partitionsToMakeFollower.map(partition => ( new TopicAndPartition(partition) , partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)

假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程

val   partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
   new TopicAndPartition(partition) -> BrokerAndInitialOffset(
    leaders.find(_.id == partition.leaderReplicaIdOpt.get).get ,
replicaFetcherManager .addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

具体的 makeFollower 方法中

通过设置 leader ISR 为空,使本地副本成为 Follower


*   记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代

*   增加新的副本

*   移除已经被 Controller 移除的已分配副本

val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set. empty [Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion

leaderReplicaIdOpt .foreach { leaderReplica =>
   if (topic == OffsetManager. OffsetsTopicName &&
      /* if we are making a leader->follower transition */
      leaderReplica == localBrokerId )

if ( leaderReplicaIdOpt .isDefined && leaderReplicaIdOpt .get == newLeaderBrokerId) {
else {
   leaderReplicaIdOpt = Some (newLeaderBrokerId)


本文主要是以 topic 的创建过程,讲解分区及副本在集群 Broker 上的分布的实现,顺便讲解新建 topic 的话分区的 leader 的选举方法,及我们的副本成为 leader Follower 的要素。

这个过程实际上也是基于 Zookeeper 实现了订阅发布系统,发布者是 TopicCommand 类,订阅者是 kafka Controller 类。再由 kafka Controller 进行分区 leader 选举 ( 副本列表第一个 ) ,然后给 TopicCommand 已经指定的各个 Broker Follower 发送 LeaderAndIsrRequest ,由根据我们 TopicCommand 中分区的分配的具体 Broker 去启动副本为 leader(leader 的被分配的 Brokerid 和当前 Broker id 相等 ) 或者 Follower


