Kafka 源码系列之 Broker 的 IO 服务及业务处理

栏目: 后端 · 发布时间: 6年前

内容简介:Kafka从图中可以看到其主要角色:

Kafka 源码系列之 Broker IO 服务及业务处理

一, kafka 角色

Kafka 源码系列主要是以 kafka 0.8.2.2 源码为例。以看 spark 等源码的经验总结除了一个重要的看源码的思路:先了解部件角色和功能角色,然后逐个功能请求序列画图分析,最后再汇总。那么,下面再啰嗦一下, kafka 的角色。 kafka 在生产中的使用,如下图。

Kafka 源码系列之 Broker 的 IO 服务及业务处理

从图中可以看到其主要角色:

1 Zookeeper:Broker 需要通过 ZooKeeper 记录集群的所有 Broker controller 等信息,记录 Consumer 的消费消息的偏移量等信息。

2 Broker: 主要负责管理数据,处理数据的生产、消费请求及副本的同步等信息。

3 Topic: 标识一个类别的消息。

4 Partition: 针对 topic 进行了进一步细分,增加并发度。牵涉到副本及 leader 选举。

5 Producer: 主要与 Broker 进行交互,来生产消息到 broker

6 Consumer: 主要是从 Broker 上获取消息,将自己的消费偏移等信息记录与 zookeeper

从各个角色的功能来看,我们整个数据服务请求的中心就是 Broker ,自然也是由 Broker 来负责各种事件处理及应答各个部件的。

二, Broker 请求及应答机制的实现

JAVA 的网络 IO 模型彻底讲解的那篇文章里,已经彻底讲解了 Java 的各种网络 IO 实现的机制及优缺点。其实, kafka Broker 就是通过 JAVA NIO 来实现监听和请求处理及应答的。

主要牵涉到的类:

1) ,KafkaServer

该类代表了一个 kafka Broker 的生命周期,处理 kafka 启动或者停止所需要的所有功能。

2) ,SocketServer

一个 NIO 服务中心。线程模型是

1 Acceptor 线程,用来处理新的链接请求

N 个加工 Processor 线程。每个线程拥有一个他们自己的 selector ,主要负责 IO 请求及应答。

3) ,KafkaRequestHandler

实际会在 KafkaRequestHandlerPool 中创建多个对象,负责加工处理 request 线程。

会创建 M 个处理 Handler 线程。负责处理 request 请求,将 responses 重新写会加工线程 Processor ,以便于其写回给客户端。

4) ,RequestChannel

该类主要是封装了 requestQueue responseQueues responseListeners ,便于个各类中同时引用并作出自己的处理。

5) ,KafkaApis

Kafka 多样请求的逻辑处理程序。

具体如图:

Kafka 源码系列之 Broker 的 IO 服务及业务处理

下面讲解 1,2,3,4,5 ,具体含义:

1 SocketServer.startup() ,会启动一个后台线程,该线程会持有一个 acceptor ,负责接收新的链接请求,并轮训所有的 Processor ,将新的链接请求加入 Processor 对象的成员变量 ConcurrentLinkedQueue 里, Processor 会在其 run 方法里面处理。

// start accepting connections
this . acceptor = new Acceptor(host , port , processors , sendBufferSize , recvBufferSize , quotas)
Utils. newThread ( "kafka-socket-acceptor" , acceptor , false ).start()
acceptor .awaitStartup

Processor 池的初始化

for (i <- 0 until numProcessorThreads) {
   processors (i) = new Processor(i ,
                                 time ,
                                 maxRequestSize ,
                                 aggregateIdleMeter ,
                                 newMeter( "IdlePercent" , "percent" , TimeUnit. NANOSECONDS , Map( "networkProcessor" -> i.toString)) ,
                                 numProcessorThreads ,
                                 requestChannel ,
                                 quotas ,
                                 connectionsMaxIdleMs)
  Utils. newThread ( "kafka-network-thread-%d-%d" .format(port , i) , processors (i) , false ).start()
}

accepttor轮训Processor

val ready = selector.select(500)
if(ready > 0) {
  val keys = selector.selectedKeys()
  val iter = keys.iterator()
  while(iter.hasNext && isRunning) {
    var key: SelectionKey = null
    try {
      key = iter.next
      iter.remove()
      if(key.isAcceptable)
         accept(key, processors(currentProcessor))
      else
         throw new IllegalStateException("Unrecognized key state for acceptor thread.")

      // round robin to the next processor thread
      currentProcessor = (currentProcessor + 1) % processors.length
    } catch {
      case e: Throwable => error("Error while accepting connection", e)
    }

2 Processor run 方法里面,会针对可读事件调用 read 方法里将 request 请求信息通过 requestChannel.sendRequest(req) 添加到 RequestChannel 的成员变量里面。

requestQueue  = new ArrayBlockingQueue[RequestChannel.Request](queueSize)

3 ,在 KafkaServer startup 方法里面构建 KafkaRequestHandlerPool 对象的时候,会构建若干 handler 线程。

for (i <- 0 until numThreads) {
   runnables (i) = new KafkaRequestHandler(i , brokerId , aggregateIdleMeter , numThreads , requestChannel , apis)
   threads (i) = Utils. daemonThread ( "kafka-request-handler-" + i , runnables (i))
   threads (i).start()
}

KafakRequestHandler 的方法里面会对我们的 request 进行处理

req = requestChannel.receiveRequest( 300 )
apis.handle(req)

实际上,是通过 KafkaApis 对象的 handle 方法进行各种逻辑的处理的。

def handle (request: RequestChannel.Request) {
   try {
    trace( "Handling request: " + request. requestObj + " from client: " + request.remoteAddress)
    request. requestId match {
       case RequestKeys. ProduceKey => handleProducerOrOffsetCommitRequest(request)
       case RequestKeys. FetchKey => handleFetchRequest(request)
       case RequestKeys. OffsetsKey => handleOffsetRequest(request)
       case RequestKeys. MetadataKey => handleTopicMetadataRequest(request)
       case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
       case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)
       case RequestKeys. UpdateMetadataKey => handleUpdateMetadataRequest(request)
       case RequestKeys. ControlledShutdownKey => handleControlledShutdownRequest(request)
       case RequestKeys. OffsetCommitKey => handleOffsetCommitRequest(request)
       case RequestKeys. OffsetFetchKey => handleOffsetFetchRequest(request)
       case RequestKeys. ConsumerMetadataKey => handleConsumerMetadataRequest(request)
       case requestId => throw new KafkaException( "Unknown api code " + requestId)
    }
  } catch {
     case e: Throwable =>
      request. requestObj .handleError(e , requestChannel , request)
      error( "error when handling request %s" .format(request. requestObj ) , e)
  } finally
     request. apiLocalCompleteTimeMs = SystemTime. milliseconds
}

4 ,在每一种请求处理结束之后会产生对应的 response

requestChannel.sendResponse( new RequestChannel.Response(request , new BoundedByteBufferSend(response)))

并将 response 存储到 RequestChannel responseQueues 存储。

5 ,最终,由我们的 Processor 在其 run 方法里面,取出 RequestChannel responseQueues 存储的时间,匹配到写事件,然后通过其 write 方法对具体的 request 进行应答。

else if (key.isWritable)
  write(key)

三,总结

这是一个典型的 Reactor 多线程模型,并且实现了 IO 线程和业务线程进行隔离。这样做的优点有以下几种 :

1, 充分利用资源

可以充分利用 CPU 资源,增加并发度,使业务响应速度加快。

2, 故障隔离 :

业务处理线程,无论是处理耗时,还是发生阻塞,都不会影响 IO 请求线程。保证服务器能在某些业务线程出故障的情况下,正常进行 IO 请求应答。

3, 可维护性

职责单一,可维护性高,方便定位问题。

此处再次建议大家仔细阅读,浪尖关于 JAVA 的网络 IO 模型彻底讲解那篇文章,彻底领会其意境。

此乃,原创。欢迎大家扫描二维码,关注浪尖微信公众号,大家共同进步。

Kafka 源码系列之 Broker 的 IO 服务及业务处理


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

新物种爆炸

新物种爆炸

吴声 / 中信出版社 / 2017-7-30 / 58.00元

宝马为什么要重点发展共享汽车 Airbnb正试图成为内容和社交平台 不排队、不结账、没有收银员的颠覆传统超市 茑屋书店要打造全新生活方式 基于新的商业环境与技术条件的变化,必须会产生新的品类和商业模式,这就是新物种! 大数据与人工智能等技术正在创建新的商业话语体系,创建新的权力架构,引领第四新物种爆炸。商业规则正在快速发生变化,新的模式与业态层出不穷。 要么成为......一起来看看 《新物种爆炸》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码

SHA 加密
SHA 加密

SHA 加密工具