kafka集群Controller竞选与责任设计思路架构详解-kafka 商业环境实战

栏目: 后端 · 发布时间: 6年前

内容简介:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

kafka集群Controller竞选与责任设计思路架构详解-kafka 商业环境实战

1 无所不能的Controller

  • 某一个broker被选举出来承担特殊的角色,就是控制器Controller。

  • Leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。

  • Controller集群就是用来管理和协调Kafka集群的,具体就是管理集群中所有分区的状态和分区对应副本的状态。

  • 每一个Kafka集群任意时刻都只能有一个controller,当集群启动的时候,所有的broker都会参与到controller的竞选,最终只能有一个broker胜出。

  • Controller维护的状态分为两类:1:管理每一台Broker上对应的分区副本。2:管理每一个Topic分区的状态。

  • KafkaController 核心代码,其中包含副本状态机和分区状态机

    class KafkaController(val config : KafkaConfig, zkClient: ZkClient, 
      val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
          this.logIdent = "[Controller " + config.brokerId + "]: "
          private var isRunning = true
          private val stateChangeLogger = KafkaController.stateChangeLogger
          val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    
          val partitionStateMachine = new PartitionStateMachine(this)
          val replicaStateMachine = new ReplicaStateMachine(this)
          
          private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
          onControllerResignation, config.brokerId)
          // have a separate scheduler for the controller to be able to start and stop independently of the
          // kafka server
          private val autoRebalanceScheduler = new KafkaScheduler(1)
          var deleteTopicManager: TopicDeletionManager = null
          val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
          private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
          private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
          private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
          private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
          
          private val partitionReassignedListener = new PartitionsReassignedListener(this)
          private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
    复制代码
  • KafkaController中共定义了五种selector选举器

    1、ReassignedPartitionLeaderSelector
      从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
      2、PreferredReplicaPartitionLeaderSelector
      如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
      3、ControlledShutdownLeaderSelector
      将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
      4、NoOpLeaderSelector
      原则上不做任何事情,返回当前的leader和isr。
      5、OfflinePartitionLeaderSelector
      从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。
    复制代码
  • kafka修改分区和副本数

    ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe  --topic test1
      
      Topic:test1       PartitionCount:3        ReplicationFactor:2     Configs:
      Topic: test1      Partition: 0    Leader: 2       Replicas: 2,4   Isr: 2,4
      Topic: test1      Partition: 1    Leader: 3       Replicas: 3,5   Isr: 3,5
      Topic: test1      Partition: 2    Leader: 4       Replicas: 4,1   Isr: 4,1
    复制代码
  • topic 分区扩容

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
    复制代码

2 ReplicaStateMachine (ZK持久化副本分配方案)

  • Replica有7种状态:

    1 NewReplica: 在partition reassignment期间KafkaController创建New replica
      
      2 OnlineReplica: 当一个replica变为一个parition的assingned replicas时
      其状态变为OnlineReplica, 即一个有效的OnlineReplica
      
      3 Online状态的parition才能转变为leader或isr中的一员
      
      4 OfflineReplica: 当一个broker down时, 上面的replica也随之die, 其状态转变为Onffline;
      ReplicaDeletionStarted: 当一个replica的删除操作开始时,其状态转变为ReplicaDeletionStarted
      
      5 ReplicaDeletionSuccessful: Replica成功删除后,其状态转变为ReplicaDeletionSuccessful
      
      6 ReplicaDeletionIneligible: Replica成功失败后,其状态转变为ReplicaDeletionIneligible
      
      7 NonExistentReplica:  Replica成功删除后, 从ReplicaDeletionSuccessful状态转变为NonExistentReplica状态
    复制代码
  • ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

  • startup: 启动ReplicaStateMachine

  • initializeReplicaState: 初始化每个replica的状态, 如果replica所在的broker是live状态,则此replica的状态为OnlineReplica。

  • 处理可以转换到Online状态的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 并且发送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)

  • 当创建某个topic时,该topic下所有分区的所有副本都是NonExistent。

  • 当controller加载Zookeeper中该topic每一个分区的所有副本信息到内存中,同时将副本的状态变更为New。

  • 之后controller选择该分区副本列表中的第一个副本作为分区的leader副本并设置所有副本进入ISR,然后在Zookeeper中持久化该决定。

  • 一旦确定了分区的Leader和ISR之后,controller会将这些消息以请求的方式发送给所有的副本。

  • 同时将这些副本状态同步到集群的所有broker上以便让他们知晓。

  • 最后controller 会把分区的所有副本状态设置为Online。

3 partitionStateMachine (根据副本分配方案创建分区)

  • Partition有如下四种状态

    NonExistentPartition: 这个partition还没有被创建或者是创建后又被删除了;
      NewPartition: 这个parition已创建, replicas也已分配好,但leader/isr还未就绪;
      OnlinePartition: 这个partition的leader选好;
      OfflinePartition: 这个partition的leader挂了,这个parition状态为OfflinePartition;
    复制代码
  • 当创建Topic时,controller负责创建分区对象,它首先会短暂的将所有分区状态设置为NonExistent。

  • 之后读取Zookeeper读取副本分配方案,然后令分区状态设置为NewPartion。

  • 处于NewPartion状态的分区尚未有leader和ISR,因此Controller会初始化leader和ISR信息并设置分区状态为OnlinePartion,此时分区正常工作。


以上所述就是小编给大家介绍的《kafka集群Controller竞选与责任设计思路架构详解-kafka 商业环境实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Node.js开发实战

Node.js开发实战

[美] Jim R. Wilson / 梅晴光、杜万智、陈琳、纪清华、段鹏飞 / 华中科技大学出版社 / 2018-11-10 / 99.90元

2018年美国亚马逊书店排名第一的Node.js开发教程。 . Node.js是基于Chrome V8引擎的JavaScript运行环境,它采用事件驱动、非阻塞式I/O模型,具有轻量、高效的特点。Node.j s 工作在前端代码与 数据存储层之间,能够提高web应用的工作效率和 响应速度。本书以最新版Node.js 8为基础,从实际案例出发 讲解Node.js的核心工作原理和实用开发技......一起来看看 《Node.js开发实战》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换