Kafka 源码系列之 topic 创建分区分配及 leader 选举

栏目: 服务器 · 发布时间: 7年前

内容简介:Topic1

一,基本介绍

本文讲解依然是基于 kafka 源码 0.8.2.2 。假如阅读过前面的文章应该知道,用户的 admin 指令都是通过 Zookeeper 发布给 kafka Controller ,然后由 Controller 发布给具体的 Broker

Topic 的创建过程亦是如此。本文主要是关注一下几点:

1 ,分区和副本是在何处,以怎样的方式分配给 Broker

2 kafka Controller 接收到 Zookeeper 的通知后做了哪些处理。

3 ,分区的 leader follower 是如何选举的。

二,重要类介绍

1 TopicCommand

Topic 相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向 Zookeeper 发布相关策略的。

2 KafkaApis

业务处理线程要使用的对象,其 handle 方法相当于将各种请求,交由相应的处理函数进行处理。

3 KafkaController

KafkaController 作为 kafka 集群的控制者,有且存在一个 leader ,若干个 follower Leader 能够发送具体的指令给 follower ,具体指令如: RequestKeys.LeaderAndIsrKey RequestKeys.StopReplicaKey RequestKeys.UpdateMetadataKey

4 PartitionStateMachine

分区的状态机,决定者分区的当前状态及状态转移过程。

NonExistentPartition :不存在。该状态的前状态假如有的话,只能是 OfflinePartition

NewPartition: 分区创建后的状态,前状态是 NonExistentPartition 。改状态说明分区已经有副本且不存在 leader/isr

OnlinePartition: 选举过 leader 后,处于该状态,前状态可是 :OfflinePartition/NewPartition

OfflinePartition: 选举过 leader 以后, leader 挂掉,分区就会处于当前状态,前状态可能是 NewPartition/OnlinePartition

三,源码实现介绍

主要是分三个步骤:

A) ,command 创建时 Partition 均匀分布于 Broker 的策略

副本分配有两个目标:

1 ,尽可能将副本均匀分配到 Broker

2 ,每个分区的副本都分配到不同的 Broker

为了实现这个目标 kafka 采取下面两个策略:

1 ,随机选取一个 Broker 位置作为分配 Partition 的起始位置,将 Partition 的第一个副本进行轮询分配

2 ,将其它副本以一个递增的位移分配到不同的 Broker 上去

源码执行的具体过程

TopicCommand.main

if (opts. options .has(opts. createOpt ))
   createTopic(zkClient , opts)

AdminUtils . createTopic (zkClient , topic , partitions , replicas , configs)

进行 partition Replicas 的均匀分配

val replicaAssignment = AdminUtils. assignReplicasToBrokers (brokerList , partitions , replicationFactor)

具体内容是如下:

val   ret = new mutable.HashMap[Int , List[Int]]()
//随机选取一个Broker位置作为startIndex
val startIndex = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
//当前分区Id赋值为0
var currentPartitionId = if (startPartitionId >= 0 ) startPartitionId else 0

//随机选取Broker数目范围内的位移
var nextReplicaShift = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
   //只有在所有遍历过Broker数目个分区后才将位移加一
   if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0 ))
    nextReplicaShift += 1
   //当前分区id加上起始位置,对Brokersize取余得到第一个副本的位置
   val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
   var replicaList = List (brokerList(firstReplicaIndex))
   for (j <- 0 until replicationFactor - 1 )
     //计算出每个副本的位置 计算方法是replicaIndex:
    //val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)
    //(firstReplicaIndex + shift) %  brokerList.size
     replicaList ::= brokerList( replicaIndex (firstReplicaIndex , nextReplicaShift , j , brokerList.size))
  ret.put(currentPartitionId , replicaList.reverse)
   //分区id加一
   currentPartitionId = currentPartitionId + 1
}
ret.toMap

将配置和分配策略写到 Zookeeper 上去

AdminUtils. createOrUpdateTopicPartitionAssignmentPathInZK (zkClient , topic , replicaAssignment , topicConfig)

该方法的具体内容如下:

写配置, Zookeeper 的目录是: /config/topics/TopicName

writeTopicConfig (zkClient , topic , config)

写分配策略, Zookeeper 的目录是: /brokers/topics/TopicName

writeTopicPartitionAssignment (zkClient , topic , partitionReplicaAssignment , update)

B) ,kafka Controller 监听到 topic 创建事件后的处理

KafkaController PartitionStateMachine 对象内部有一个 Zookeeper listener 专门监听新增 topic 事件。 TopicChangeListener

获取新增 topic

val newTopics = currentChildren -- controllerContext . allTopics

获取分区副本分配策略 HashMap[TopicAndPartition, Seq[Int]]

val   addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient , newTopics.toSeq)

进入具体的操作

if (newTopics.size > 0 )
   //进入具体的操作
   controller.onNewTopicCreation(newTopics , addedPartitionReplicaAssignment.keySet.toSet)

订阅新增 topic 的分区变动事件

// subscribe to partition changes 注册指定topic的分区变动事件监听器
topics.foreach(topic => partitionStateMachine .registerPartitionChangeListener(topic))

处理新增分区 onNewPartitionCreation

该方法主要做两件事:

1 将新建分区的状态转化为 NewPartition 状态

partitionStateMachine .handleStateChanges(newPartitions , NewPartition)

进入处理函数得到

partitions .foreach { topicAndPartition =>
  handleStateChange(topicAndPartition.topic , topicAndPartition.partition , targetState , leaderSelector , callbacks)
}

case   NewPartition =>
   //指定TopicAndPartition 获取副本
   assignReplicasToPartitions(topic , partition)
   partitionState .put(topicAndPartition , NewPartition)
   val assignedReplicas = controllerContext . partitionReplicaAssignment (topicAndPartition).mkString( "," )

AssgnReplicasToPartition 方法的具体内容,主要是先获取分区所在的 Brokerid 序列,然后

val assignedReplicas = ZkUtils. getReplicasForPartition ( controllerContext .zkClient , topic , partition)
controllerContext . partitionReplicaAssignment += TopicAndPartition (topic , partition) -> assignedReplicas

2 将新建分区的状态从 NewPartition OnlinePartition 状态

partitionStateMachine .handleStateChanges(newPartitions , OnlinePartition , offlinePartitionSelector )

handleStateChange ,中具体处理是

case   OnlinePartition =>
  assertValidPreviousStates(topicAndPartition , List (NewPartition , OnlinePartition , OfflinePartition) , OnlinePartition)
   partitionState (topicAndPartition) match {
     case NewPartition =>
       // initialize leader and isr path for new partition
       initializeLeaderAndIsrForPartition(topicAndPartition)

initializeLeaderAndIsrForPartition. 第一个 seq 中的 Broker 当做 leader

val   leader = liveAssignedReplicas.head   //第一个副本作为leader
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch( new LeaderAndIsr(leader , liveAssignedReplicas.toList) ,
   controller.epoch)

更新具体分区的状态信息

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state
//        {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}
           ZkUtils. createPersistentPath ( controllerContext .zkClient ,
             ZkUtils. getTopicPartitionLeaderAndIsrPath (topicAndPartition.topic , topicAndPartition.partition) ,
             ZkUtils. leaderAndIsrZkData (leaderIsrAndControllerEpoch.leaderAndIsr , controller.epoch))

topic 分区 副本 放入 leaderAndIsrRequestMap ,以便我们可以通过 Brokerid 找到

brokerRequestBatch .addLeaderAndIsrRequestForBrokers(liveAssignedReplicas , topicAndPartition.topic ,
   topicAndPartition.partition , leaderIsrAndControllerEpoch , replicaAssignment)

发信息给需要的 BrokerID

leaderAndIsrRequestMap .foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = controllerContext .liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
       val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos , leaders , controllerId , controllerEpoch , correlationId , clientId )
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
         stateChangeLogger .trace(( "Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
                                  "for partition [%s,%d]" ).format( controllerId , controllerEpoch , typeOfRequest ,
                                                                  p._2.leaderIsrAndControllerEpoch , correlationId , broker ,
                                                                  p._1._1 , p._1._2))
      }
//      给具体的Broker发送LeaderAndIsrRequest
       controller.sendRequest(broker , leaderAndIsrRequest , null )
    }

C) , Broker leader follower 的产生过程

Broker 接收到 Controller LeaderAndIsrRequest 消息后,交由 kafkaApis handle 处理

case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

当前 Broker 成为副本的 leader 或者 follower 的入口函数

val (response , error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest , offsetManager)

当前 Broker 能不能成为 Broker ,取决于 Brokerid 是否与 leader 分配的 Brokerid 一致,一致就会成为 leader ,否则 follower

val partitionsTobeLeader = partitionState
  .filter{ case (partition , partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config. brokerId }
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

真正的进入 leader 或者 follower 的过程

if   (!partitionsTobeLeader.isEmpty)
  makeLeaders(controllerId , controllerEpoch , partitionsTobeLeader , leaderAndISRRequest.correlationId , responseMap , offsetManager)
if (!partitionsToBeFollower.isEmpty)
  makeFollowers(controllerId , controllerEpoch , partitionsToBeFollower , leaderAndISRRequest.leaders , leaderAndISRRequest.correlationId , responseMap , offsetManager)

在接收到第一个 leaderisrrequest 后初始化 highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动 Checkpointing 之前。

if   (! hwThreadInitialized ) {
  startHighWaterMarksCheckPointThread()
   hwThreadInitialized = true
}

下面具体讲解 makeleaders makeFollowers 方法

使当前 Broker 成为给定分区的 leader ,需要做以下几个处理:

* 1 ,停止掉这些分区的 fetchers

* 2 ,更新缓存的当前分区的元数据

* 3 ,将分区加入 leader 分区集合

// First stop fetchers for all the partitions
replicaFetcherManager .removeFetcherForPartitions(partitionState.keySet.map( new TopicAndPartition(_)))

// Update the partition information to be the leader
partitionState.foreach{ case (partition , partitionStateInfo) =>
  partition.makeLeader(controllerId , partitionStateInfo , correlationId , offsetManager)}

Makeleader 方法具体的操作了一个副本成为 leader 的过程:

主要做了以下几件事情:

*   记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代

*   增加新的副本

*   移除已经被 Controller 移除的已分配副本

*   为新的 leader 副本构建高水位元数据

*   为远程副本重置 logendoffset

*   由于 isr 可能将为 1 ,我们需要增加高水位

具体源码如下:

def makeLeader (controllerId: Int,
                partitionStateInfo: PartitionStateInfo , correlationId: Int,
                offsetManager: OffsetManager): Boolean = {
   inWriteLock ( leaderIsrUpdateLock ) {
     val allReplicas = partitionStateInfo.allReplicas
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
     // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
    // to maintain the decision maker controller's epoch in the zookeeper path
     controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
     // add replicas that are new
     allReplicas.foreach(replica => getOrCreateReplica(replica))
     val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
     // remove assigned replicas that have been removed by the controller
     (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
     inSyncReplicas = newInSyncReplicas
     leaderEpoch = leaderAndIsr.leaderEpoch
     zkVersion = leaderAndIsr.zkVersion
     leaderReplicaIdOpt = Some ( localBrokerId )
     // construct the high watermark metadata for the new leader replica
     val newLeaderReplica = getReplica().get
    newLeaderReplica.convertHWToLocalOffsetMetadata()
     // reset log end offset for remote replicas
     assignedReplicas.foreach(r => if (r.brokerId != localBrokerId ) r.logEndOffset = LogOffsetMetadata. UnknownOffsetMetadata )
     // we may need to increment high watermark since ISR could be down to 1
     maybeIncrementLeaderHW(newLeaderReplica)
     if (topic == OffsetManager. OffsetsTopicName )
      offsetManager.loadOffsetsFromLog(partitionId)
     true
   }
}

当前 Broker 成为给定分区的 follower 要做要做以下几个处理:

* 1 ,将分区从 leader partition 集合中移除

* 2 ,将副本标记为 follower ,目的是不让生产者继续往该副本生产消息

* 3 ,停止掉该分区的所有 fetcher ,目的是不让副本 fetcher 线程往该副本写数据。

* 4 ,清空当前分区的 log Checkpoint offsets

* 5 ,假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程

具体代码如下:

将分区从 leader partition 集合中移除

将副本标记为 follower ,目的是不让生产者继续往该副本生产消息

partitionState.foreach{ case ( partition , partitionStateInfo) =>
   val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
   val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
  leaders.find(_.id == newLeaderBrokerId) match {
     // Only change partition state when the leader is available
     case Some (leaderBroker) =>
       if ( partition .makeFollower(controllerId , partitionStateInfo , correlationId , offsetManager))
        partitionsToMakeFollower += partition

当前分区的 log Checkpoint offsets

replicaFetcherManager .removeFetcherForPartitions( partitionsToMakeFollower .map( new TopicAndPartition(_)))

清空当前分区的 log Checkpoint offsets

logManager .truncateTo(partitionsToMakeFollower.map(partition => ( new TopicAndPartition(partition) , partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)

假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程

val   partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
   new TopicAndPartition(partition) -> BrokerAndInitialOffset(
    leaders.find(_.id == partition.leaderReplicaIdOpt.get).get ,
     partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager .addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

具体的 makeFollower 方法中

通过设置 leader ISR 为空,使本地副本成为 Follower

主要做了以下几件事情:

*   记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代

*   增加新的副本

*   移除已经被 Controller 移除的已分配副本

val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set. empty [Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion

leaderReplicaIdOpt .foreach { leaderReplica =>
   if (topic == OffsetManager. OffsetsTopicName &&
      /* if we are making a leader->follower transition */
      leaderReplica == localBrokerId )
    offsetManager.clearOffsetsInPartition(partitionId)
}

if ( leaderReplicaIdOpt .isDefined && leaderReplicaIdOpt .get == newLeaderBrokerId) {
   false
}
else {
   leaderReplicaIdOpt = Some (newLeaderBrokerId)
   true
}

四,总结

本文主要是以 topic 的创建过程,讲解分区及副本在集群 Broker 上的分布的实现,顺便讲解新建 topic 的话分区的 leader 的选举方法,及我们的副本成为 leader Follower 的要素。

这个过程实际上也是基于 Zookeeper 实现了订阅发布系统,发布者是 TopicCommand 类,订阅者是 kafka Controller 类。再由 kafka Controller 进行分区 leader 选举 ( 副本列表第一个 ) ,然后给 TopicCommand 已经指定的各个 Broker Follower 发送 LeaderAndIsrRequest ,由根据我们 TopicCommand 中分区的分配的具体 Broker 去启动副本为 leader(leader 的被分配的 Brokerid 和当前 Broker id 相等 ) 或者 Follower

假如,你对kafka,hbase,spark源码感兴趣,请关注浪尖公众号。

Kafka 源码系列之 topic 创建分区分配及 leader 选举


以上所述就是小编给大家介绍的《Kafka 源码系列之 topic 创建分区分配及 leader 选举》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

More Eric Meyer on CSS (Voices That Matter)

More Eric Meyer on CSS (Voices That Matter)

Eric A. Meyer / New Riders Press / 2004-04-08 / USD 45.00

Ready to commit to using more CSS on your sites? If you are a hands-on learner who has been toying with CSS and want to experiment with real-world projects that will enable you to see how CSS......一起来看看 《More Eric Meyer on CSS (Voices That Matter)》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具