内容简介:Topic1
一,基本介绍
本文讲解依然是基于 kafka 源码 0.8.2.2 。假如阅读过前面的文章应该知道,用户的 admin 指令都是通过 Zookeeper 发布给 kafka 的 Controller ,然后由 Controller 发布给具体的 Broker 。
Topic 的创建过程亦是如此。本文主要是关注一下几点:
1 ,分区和副本是在何处,以怎样的方式分配给 Broker 。
2 , kafka 的 Controller 接收到 Zookeeper 的通知后做了哪些处理。
3 ,分区的 leader 和 follower 是如何选举的。
二,重要类介绍
1 , TopicCommand
Topic 相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向 Zookeeper 发布相关策略的。
2 , KafkaApis
业务处理线程要使用的对象,其 handle 方法相当于将各种请求,交由相应的处理函数进行处理。
3 , KafkaController
KafkaController 作为 kafka 集群的控制者,有且存在一个 leader ,若干个 follower 。 Leader 能够发送具体的指令给 follower ,具体指令如: RequestKeys.LeaderAndIsrKey , RequestKeys.StopReplicaKey , RequestKeys.UpdateMetadataKey 。
4 , PartitionStateMachine
分区的状态机,决定者分区的当前状态及状态转移过程。
NonExistentPartition :不存在。该状态的前状态假如有的话,只能是 OfflinePartition
NewPartition: 分区创建后的状态,前状态是 NonExistentPartition 。改状态说明分区已经有副本且不存在 leader/isr 。
OnlinePartition: 选举过 leader 后,处于该状态,前状态可是 :OfflinePartition/NewPartition 。
OfflinePartition: 选举过 leader 以后, leader 挂掉,分区就会处于当前状态,前状态可能是 NewPartition/OnlinePartition
三,源码实现介绍
主要是分三个步骤:
A) ,command 创建时 Partition 均匀分布于 Broker 的策略
副本分配有两个目标:
1 ,尽可能将副本均匀分配到 Broker 上
2 ,每个分区的副本都分配到不同的 Broker 上
为了实现这个目标 kafka 采取下面两个策略:
1 ,随机选取一个 Broker 位置作为分配 Partition 的起始位置,将 Partition 的第一个副本进行轮询分配
2 ,将其它副本以一个递增的位移分配到不同的 Broker 上去
源码执行的具体过程
TopicCommand.main
if (opts. options .has(opts. createOpt ))
createTopic(zkClient , opts)
AdminUtils . createTopic (zkClient , topic , partitions , replicas , configs)
进行 partition 和 Replicas 的均匀分配
val replicaAssignment = AdminUtils. assignReplicasToBrokers (brokerList , partitions , replicationFactor)
具体内容是如下:
val ret = new mutable.HashMap[Int , List[Int]]()
//随机选取一个Broker位置作为startIndex
val startIndex = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
//当前分区Id赋值为0
var currentPartitionId = if (startPartitionId >= 0 ) startPartitionId else 0
//随机选取Broker数目范围内的位移
var nextReplicaShift = if (fixedStartIndex >= 0 ) fixedStartIndex else rand .nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
//只有在所有遍历过Broker数目个分区后才将位移加一
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0 ))
nextReplicaShift += 1
//当前分区id加上起始位置,对Brokersize取余得到第一个副本的位置
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List (brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1 )
//计算出每个副本的位置 计算方法是replicaIndex:
//val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)
//(firstReplicaIndex + shift) % brokerList.size
replicaList ::= brokerList( replicaIndex (firstReplicaIndex , nextReplicaShift , j , brokerList.size))
ret.put(currentPartitionId , replicaList.reverse)
//分区id加一
currentPartitionId = currentPartitionId + 1
}
ret.toMap
将配置和分配策略写到 Zookeeper 上去
AdminUtils. createOrUpdateTopicPartitionAssignmentPathInZK (zkClient , topic , replicaAssignment , topicConfig)
该方法的具体内容如下:
写配置, Zookeeper 的目录是: /config/topics/TopicName
writeTopicConfig (zkClient , topic , config)
写分配策略, Zookeeper 的目录是: /brokers/topics/TopicName
writeTopicPartitionAssignment (zkClient , topic , partitionReplicaAssignment , update)
B) ,kafka Controller 监听到 topic 创建事件后的处理
KafkaController 的 PartitionStateMachine 对象内部有一个 Zookeeper 的 listener 专门监听新增 topic 事件。 TopicChangeListener 。
获取新增 topic
val newTopics = currentChildren -- controllerContext . allTopics
获取分区副本分配策略 HashMap[TopicAndPartition, Seq[Int]]
val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient , newTopics.toSeq)
进入具体的操作
if (newTopics.size > 0 )
//进入具体的操作
controller.onNewTopicCreation(newTopics , addedPartitionReplicaAssignment.keySet.toSet)
订阅新增 topic 的分区变动事件
// subscribe to partition changes 注册指定topic的分区变动事件监听器
topics.foreach(topic => partitionStateMachine .registerPartitionChangeListener(topic))
处理新增分区 onNewPartitionCreation
该方法主要做两件事:
1 , 将新建分区的状态转化为 NewPartition 状态
partitionStateMachine .handleStateChanges(newPartitions , NewPartition)
进入处理函数得到
partitions .foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic , topicAndPartition.partition , targetState , leaderSelector , callbacks)
}
case NewPartition =>
//指定TopicAndPartition 获取副本
assignReplicasToPartitions(topic , partition)
partitionState .put(topicAndPartition , NewPartition)
val assignedReplicas = controllerContext . partitionReplicaAssignment (topicAndPartition).mkString( "," )
AssgnReplicasToPartition 方法的具体内容,主要是先获取分区所在的 Brokerid 序列,然后
val assignedReplicas = ZkUtils. getReplicasForPartition ( controllerContext .zkClient , topic , partition)
controllerContext . partitionReplicaAssignment += TopicAndPartition (topic , partition) -> assignedReplicas
2 , 将新建分区的状态从 NewPartition 到 OnlinePartition 状态
partitionStateMachine .handleStateChanges(newPartitions , OnlinePartition , offlinePartitionSelector )
在 handleStateChange ,中具体处理是
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition , List (NewPartition , OnlinePartition , OfflinePartition) , OnlinePartition)
partitionState (topicAndPartition) match {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition)
在 initializeLeaderAndIsrForPartition. 第一个 seq 中的 Broker 当做 leader
val leader = liveAssignedReplicas.head //第一个副本作为leader
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch( new LeaderAndIsr(leader , liveAssignedReplicas.toList) ,
controller.epoch)
更新具体分区的状态信息
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state
// {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}
ZkUtils. createPersistentPath ( controllerContext .zkClient ,
ZkUtils. getTopicPartitionLeaderAndIsrPath (topicAndPartition.topic , topicAndPartition.partition) ,
ZkUtils. leaderAndIsrZkData (leaderIsrAndControllerEpoch.leaderAndIsr , controller.epoch))
topic 分区 副本 放入 leaderAndIsrRequestMap ,以便我们可以通过 Brokerid 找到
brokerRequestBatch .addLeaderAndIsrRequestForBrokers(liveAssignedReplicas , topicAndPartition.topic ,
topicAndPartition.partition , leaderIsrAndControllerEpoch , replicaAssignment)
发信息给需要的 BrokerID
leaderAndIsrRequestMap .foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext .liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos , leaders , controllerId , controllerEpoch , correlationId , clientId )
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger .trace(( "Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
"for partition [%s,%d]" ).format( controllerId , controllerEpoch , typeOfRequest ,
p._2.leaderIsrAndControllerEpoch , correlationId , broker ,
p._1._1 , p._1._2))
}
// 给具体的Broker发送LeaderAndIsrRequest
controller.sendRequest(broker , leaderAndIsrRequest , null )
}
C) , Broker leader 和 follower 的产生过程
在 Broker 接收到 Controller 的 LeaderAndIsrRequest 消息后,交由 kafkaApis 的 handle 处理
case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
当前 Broker 成为副本的 leader 或者 follower 的入口函数
val (response , error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest , offsetManager)
当前 Broker 能不能成为 Broker ,取决于 Brokerid 是否与 leader 分配的 Brokerid 一致,一致就会成为 leader ,否则 follower
val partitionsTobeLeader = partitionState
.filter{ case (partition , partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config. brokerId }
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
真正的进入 leader 或者 follower 的过程
if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId , controllerEpoch , partitionsTobeLeader , leaderAndISRRequest.correlationId , responseMap , offsetManager)
if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId , controllerEpoch , partitionsToBeFollower , leaderAndISRRequest.leaders , leaderAndISRRequest.correlationId , responseMap , offsetManager)
在接收到第一个 leaderisrrequest 后初始化 highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动 Checkpointing 之前。
if (! hwThreadInitialized ) {
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
下面具体讲解 makeleaders 和 makeFollowers 方法
使当前 Broker 成为给定分区的 leader ,需要做以下几个处理:
* 1 ,停止掉这些分区的 fetchers
* 2 ,更新缓存的当前分区的元数据
* 3 ,将分区加入 leader 分区集合
// First stop fetchers for all the partitions
replicaFetcherManager .removeFetcherForPartitions(partitionState.keySet.map( new TopicAndPartition(_)))
// Update the partition information to be the leader
partitionState.foreach{ case (partition , partitionStateInfo) =>
partition.makeLeader(controllerId , partitionStateInfo , correlationId , offsetManager)}
Makeleader 方法具体的操作了一个副本成为 leader 的过程:
主要做了以下几件事情:
* 记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代
* 增加新的副本
* 移除已经被 Controller 移除的已分配副本
* 为新的 leader 副本构建高水位元数据
* 为远程副本重置 logendoffset
* 由于 isr 可能将为 1 ,我们需要增加高水位
具体源码如下:
def makeLeader (controllerId: Int,
partitionStateInfo: PartitionStateInfo , correlationId: Int,
offsetManager: OffsetManager): Boolean = {
inWriteLock ( leaderIsrUpdateLock ) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some ( localBrokerId )
// construct the high watermark metadata for the new leader replica
val newLeaderReplica = getReplica().get
newLeaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId ) r.logEndOffset = LogOffsetMetadata. UnknownOffsetMetadata )
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager. OffsetsTopicName )
offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
当前 Broker 成为给定分区的 follower 要做要做以下几个处理:
* 1 ,将分区从 leader partition 集合中移除
* 2 ,将副本标记为 follower ,目的是不让生产者继续往该副本生产消息
* 3 ,停止掉该分区的所有 fetcher ,目的是不让副本 fetcher 线程往该副本写数据。
* 4 ,清空当前分区的 log 和 Checkpoint offsets
* 5 ,假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程
具体代码如下:
将分区从 leader partition 集合中移除
将副本标记为 follower ,目的是不让生产者继续往该副本生产消息
partitionState.foreach{ case ( partition , partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some (leaderBroker) =>
if ( partition .makeFollower(controllerId , partitionStateInfo , correlationId , offsetManager))
partitionsToMakeFollower += partition
当前分区的 log 和 Checkpoint offsets
replicaFetcherManager .removeFetcherForPartitions( partitionsToMakeFollower .map( new TopicAndPartition(_)))
清空当前分区的 log 和 Checkpoint offsets
logManager .truncateTo(partitionsToMakeFollower.map(partition => ( new TopicAndPartition(partition) , partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
假如 Broker 没有挂掉,增加从新 leader 获取数据的副本 fetcher 线程
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get ,
partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager .addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
具体的 makeFollower 方法中
通过设置 leader 和 ISR 为空,使本地副本成为 Follower
主要做了以下几件事情:
* 记录 LeaderShip 决议的时代。在更新 isr 并维护 Zookeeperpath 的中的 Controller 时代
* 增加新的副本
* 移除已经被 Controller 移除的已分配副本
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set. empty [Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt .foreach { leaderReplica =>
if (topic == OffsetManager. OffsetsTopicName &&
/* if we are making a leader->follower transition */
leaderReplica == localBrokerId )
offsetManager.clearOffsetsInPartition(partitionId)
}
if ( leaderReplicaIdOpt .isDefined && leaderReplicaIdOpt .get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some (newLeaderBrokerId)
true
}
四,总结
本文主要是以 topic 的创建过程,讲解分区及副本在集群 Broker 上的分布的实现,顺便讲解新建 topic 的话分区的 leader 的选举方法,及我们的副本成为 leader 和 Follower 的要素。
这个过程实际上也是基于 Zookeeper 实现了订阅发布系统,发布者是 TopicCommand 类,订阅者是 kafka 的 Controller 类。再由 kafka 的 Controller 进行分区 leader 选举 ( 副本列表第一个 ) ,然后给 TopicCommand 已经指定的各个 Broker Follower 发送 LeaderAndIsrRequest ,由根据我们 TopicCommand 中分区的分配的具体 Broker 去启动副本为 leader(leader 的被分配的 Brokerid 和当前 Broker 的 id 相等 ) 或者 Follower 。
假如,你对kafka,hbase,spark源码感兴趣,请关注浪尖公众号。
以上所述就是小编给大家介绍的《Kafka 源码系列之 topic 创建分区分配及 leader 选举》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 以太坊2.0:公开单一领导人选举(PSLE)+秘密概率后备选举(SPBE)研究
- 俄罗斯再度利用网络攻击试图干扰乌克兰选举 (附俄国历年干扰选举案例汇总)
- zookeeper选举算法
- Raft leader 选举
- zookeeper-选举源码分析
- Kubernetes 实战:Leader 选举
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
企业应用架构模式
Martin Fowler、王怀民、周斌 / 王怀民、周斌 / 机械工业出版社 / 2004-7 / 49.00元
本书作者是当今面向对象软件开发的权威,他在一组专家级合作者的帮助下,将40多种经常出现的解决方案转化成模式,最终写成这本能够应用于任何一种企业应用平台的、关于解决方案的、不可或缺的手册。本书获得了2003年度美国软件开发杂志图书类的生产效率奖和读者选择奖。本书分为两大部分。第一部分是关于如何开发企业应用的简单介绍。第二部分是本书的主体,是关于模式的详细参考手册,每个模式都给出使用方法和实现信息,并一起来看看 《企业应用架构模式》 这本书的介绍吧!