Kafka 源码系列之分组消费的再平衡策略

栏目: 后端 · 发布时间: 6年前

一, Kafka 消费模式

kafka 消费消息, kafka 客户端提供两种模式 : 分区消费,分组消费。

分区消费对应的就是我们的 DirectKafkaInputDStream

分组消费对应的就是我们的 KafkaInputDStream

消费者数目跟分区数目的关系 :

1) ,一个消费者可以消费一个到全部分区数据

2) ,分组消费,同一个分组内所有消费者消费一份完整的数据,此时一个分区数据只能被一个消费者消费,而一个消费者可以消费多个分区数据

3) ,同一个消费组内,消费者数目大于分区数目后,消费者会有空余 = 分区数 - 消费者数

Kafka 源码系列之分组消费的再平衡策略

二,分组消费的再平衡策略

当一个 group , consumer 加入或者离开时 , 会触发 partitions 均衡 partition.assignment.strategy, 决定了 partition 分配给消费者的分配策略,有两种分配策略:

1 org.apache.kafka.clients.consumer.RangeAssignor

默认采用的是这种再平衡方式,这种方式分配只是针对消费者订阅的 topic 的单个 topic 所有分区再分配, Consumer Rebalance 的算法如下:

1), 将目标 Topic 下的所有 Partirtion 排序,存于 TP

2), 对某 Consumer Group 下所有 Consumer 按照名字根据字典排序,存于 CG ,第 i Consumer 记为 Ci

3),N=size(TP)/size(CG)

4),R=size(TP)%size(CG)

5),Ci 获取的分区起始位置 =N*i+min(i,R)

6),Ci 获取的分区总数 =N+(if (i+ 1 > R) 0 else 1)

2 org.apache.kafka.clients.consumer.RoundRobinAssignor

这种分配策略是针对消费者消费的所有 topic的所有分区进行分配。当有新的消费者加入或者有消费者退出,就会触发rebalance。这种方式有两点要求

A) ,在实例化每个消费者时给每个topic指定相同的流数

B) ,每个消费者实例订阅的topic必须相同

Map <String , Integer> topicCountMap = new HashMap<String , Integer>() ;
topicCountMap.put( topic , new Integer( 1 )) ;
Map <String , List<KafkaStream< byte [] , byte []>>> consumerMap = consumer .createMessageStreams(topicCountMap) ;

其中, topic对应的value就是流数目。对应的kafka源码是在

kafka.consumer.ZookeeperConsumerConnector的consume方法里,根据这个参数构建了相同数目的KafkaStream。

这种策略的具体分配步骤 :

1) , 对所有 topic 的所有分区按照 topic+partition string 之后的 hash 进行排序

2) , 对消费者按字典进行排序

3) , 然后轮训的方式将分区分配给消费者

3 ,举例对比

举个例子,比如有两个消费者 (c0,c1) ,两个 topic(t0,t1) ,每个 topic 有三个分区 p(0-2)

那么采用 RangeAssignor ,结果为:

* C0: [t0p0, t0p1, t1p0, t1p1]

* C1: [t0p2, t1p2]

采用 RoundRobinAssignor ,结果为:

* C0: [t0p0, t0p2, t1p1]

* C1: [t0p1, t1p0, t1p2]

三,本节源码设计的重要概念及 zookeeper 相关目录

1 ,本节涉及的 zookeeper 目录

A) , 消费者目录 , 获取子节点就可以获取所有的消费者

/consumers/group.id/ids/

B) ,topic 的目录,可以获取 topic ,分区及副本信息

/brokers/topics/topicName

值:

{"version":1,"partitions":{"0":[5,6],"2":[1,4],"27":[0,4],"1":[7,0]}}

partitions对应值的 key是分区id,value是Broker id列表。

C), 分区所属的消费者线程关系

/consumers/groupId/owners/topic/partitionid

值就是消费者线程 id ,也就是在 A 向获取的消费者后加了一个 id 值。

2 ,涉及的概念

A), 消费者 ID

val consumerIdString = {
   var consumerUuid : String = null
   config. consumerId match {
     case Some ( consumerId ) // for testing only
     => consumerUuid = consumerId
     case None // generate unique consumerId automatically
     => val uuid = UUID. randomUUID ()
    consumerUuid = "%s-%d-%s" .format(
      InetAddress. getLocalHost .getHostName , System. currentTimeMillis ,
       uuid.getMostSignificantBits().toHexString.substring( 0 , 8 ))
  }
  config. groupId + "_" + consumerUuid
}

B), 消费者线程 ID

主要是在消费者 id 的基础上,根据消费者构建指定的 topic Stream 数目,递增加了个数字的值

for ((topic , nConsumers) <- topicCountMap) {
   val consumerSet = new mutable.HashSet[ConsumerThreadId]
   assert (nConsumers >= 1 )
   for (i <- 0 until nConsumers)
    consumerSet += ConsumerThreadId (consumerIdString , i) //ConusmerId的结尾加上一个常量区别 owners 目录下可以看到
   consumerThreadIdsPerTopicMap.put(topic , consumerSet)
}

ConsumerThreadId

"%s-%d" .format(consumer , threadId)

C),TopicAndPartition

topic 名字的表示每个分区,重点关注其 toString 方法,在比较的时候用到了。

TopicAndPartition(topic: String , partition: Int )

override def toString = "[%s,%d]" .format(topic , partition)

四,源码解析

1 AssignmentContext

主要作用是根据指定的消费组,消费者, topic 信息,从 zookeeper 上获取相关数据并解析得到,两种分配策略要用的四个数据结构。解析过程请结合 zookeeper 的相关目录及节点的数据类型和 kafka 源码自行阅读。

class AssignmentContext(group: String , val consumerId: String , excludeInternalTopics: Boolean, zkClient: ZkClient) {
   //(topic,ConsumerThreadIdSet) //指定一个消费者,根据每个topic指定的streams数目,构建相同数目的流
   val myTopicThreadIds : collection.Map[ String , collection.Set[ConsumerThreadId]] = {
     val myTopicCount = TopicCount. constructTopicCount (group , consumerId , zkClient , excludeInternalTopics)
    myTopicCount.getConsumerThreadIdsPerTopic
  }

   //(topic 分区) /brokers/topics/topicname 值
   val partitionsForTopic : collection.Map[ String , Seq [ Int ]] =
    ZkUtils. getPartitionsForTopics (zkClient , myTopicThreadIds .keySet.toSeq)

   //(topic,ConsumerThreadId(当前消费者id)) ///consumers/Groupid/ids 子节点
   val consumersForTopic : collection.Map[ String , List [ConsumerThreadId]] =
    ZkUtils. getConsumersPerTopic (zkClient , group , excludeInternalTopics)

   ///consumers/Groupid/ids的所有的子节点
   val consumers : Seq [ String ] = ZkUtils. getConsumersInGroup (zkClient , group).sorted
}

2 RangeAssignor

class RangeAssignor() extends PartitionAssignor with Logging {

   def assign (ctx: AssignmentContext) = {
     val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition , ConsumerThreadId]()

     for ((topic , consumerThreadIdSet) <- ctx. myTopicThreadIds ) {
       val curConsumers = ctx. consumersForTopic (topic) //当前topic的所有消费者
       val curPartitions: Seq [ Int ] = ctx. partitionsForTopic (topic) //当前topic的所有分区

      //
       val nPartsPerConsumer = curPartitions.size / curConsumers.size
       val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info( "Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
         " for topic " + topic + " with consumers: " + curConsumers)

       for (consumerThreadId <- consumerThreadIdSet) {
         val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //获取当前消费者线程的在集合中的位置
         assert (myConsumerPosition >= 0 )
         //获取分区的起始位置
         val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
         //获取当前消费者消费的分区数目
         val nParts = nPartsPerConsumer + ( if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1 )

         /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
         if (nParts <= 0 )
          warn( "No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
         else {
           //将分区关系描述写入partitionOwnershipDecision
           for (i <- startPart until startPart + nParts) {
             val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
             // record the partition ownership decision
             partitionOwnershipDecision += ( TopicAndPartition (topic , partition) -> consumerThreadId)
          }
        }
      }
    }

    partitionOwnershipDecision
  }
}

3 RoundRobinAssignor

class RoundRobinAssignor() extends PartitionAssignor with Logging {

   def assign (ctx: AssignmentContext) = {
     val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition , ConsumerThreadId]()

     // check conditions (a) and (b)    topic, List[ConsumerThreadId]
     val (headTopic , headThreadIdSet) = (ctx. consumersForTopic .head._1 , ctx. consumersForTopic .head._2.toSet)

     //测试输出
     ctx. consumersForTopic .foreach { case (topic , threadIds) =>
       val threadIdSet = threadIds.toSet
       require (threadIdSet == headThreadIdSet ,
               "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +
               "AND if the stream counts across topics are identical for a given consumer instance. \n " +
               "Topic %s has the following available consumer streams: %s \n " .format(topic , threadIdSet) +
               "Topic %s has the following available consumer streams: %s \n " .format(headTopic , headThreadIdSet))
    }
     //为传入的集合创建一个循环迭代器,传入之前 排序 是按照字典排序
     val threadAssignor = Utils. circularIterator (headThreadIdSet.toSeq.sorted)

    info( "Starting round-robin assignment with consumers " + ctx. consumers )

     //TopicAndPartition 按照字符串的hash排序
     val allTopicPartitions = ctx. partitionsForTopic .flatMap { case (topic , partitions) =>
      info( "Consumer %s rebalancing the following partitions for topic %s: %s"
            .format(ctx.consumerId , topic , partitions))

      partitions.map(partition => {
         TopicAndPartition (topic , partition)   //toString = "[%s,%d]".format(topic, partition)
       })

    }.toSeq.sortWith((topicPartition1 , topicPartition2) => {
       /*
       * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
       * up on one consumer (if it has a high enough stream count).
       */
      //按照hash值进行排序
       topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
    })

     //过滤得到当前消费者的线程id
     allTopicPartitions.foreach(topicPartition => {
       val threadId = threadAssignor.next()
       if (threadId.consumer == ctx.consumerId)
        partitionOwnershipDecision += (topicPartition -> threadId)
    })

     //返回得到结果
     partitionOwnershipDecision
  }
}

五,总结

本文主要是讲解分组消费的两种将分区分配给消费者线程的分配策略。结合前面两篇

<Kafka源码系列之Consumer高级API性能分析>和<Kafka源码系列之源码解析SimpleConsumer的消费过程>,大家应该会对kafka的 java 消费者客户端的实现及性能优缺点有彻底的了解了。

分组,分区两种种模型其实跟kafka集群并没有关系,是我们java客户端实现的区别。 生产中可以根据自己的需要选择两种消费模型。建议流量不是很大,也没过分的性能需求,选择分组消费,这样同分组多消费者的话相当于实现了同分组的消费者故障转移。

本文乃原创,希望大家尊重浪尖成功,不足之处请留言指正。

欢迎大家关注浪尖公众号,一起开启分布式之旅。

Kafka 源码系列之分组消费的再平衡策略


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Qt 5.9 C++开发指南

Qt 5.9 C++开发指南

王维波、栗宝鹃、侯春望 / 人民邮电出版社 / 2018-5-1 / 89.00元

本书以Qt 5.9 LTS版本为开发平台,详细介绍了Qt C++开发应用程序的技术,包括Qt应用程序的基本架构、信号与槽工作机制、图形显示的Graphics/View架构、数据编辑和显示的Model/View架构、对话框和多窗口的设计与调用方法等,介绍了常用界面组件、文件读写、绘图、图表、数据可视化、数据库、多线程、网络和多媒体等模块的使用。每个编程主题都精心设计了完整的实例程序。 通过阅读......一起来看看 《Qt 5.9 C++开发指南》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具