Kafka 源码系列之 Broker 的 IO 服务及业务处理

栏目: 后端 · 发布时间: 6年前

内容简介:Kafka从图中可以看到其主要角色:

Kafka 源码系列之 Broker IO 服务及业务处理

一, kafka 角色

Kafka 源码系列主要是以 kafka 0.8.2.2 源码为例。以看 spark 等源码的经验总结除了一个重要的看源码的思路:先了解部件角色和功能角色,然后逐个功能请求序列画图分析,最后再汇总。那么,下面再啰嗦一下, kafka 的角色。 kafka 在生产中的使用,如下图。

Kafka 源码系列之 Broker 的 IO 服务及业务处理

从图中可以看到其主要角色:

1 Zookeeper:Broker 需要通过 ZooKeeper 记录集群的所有 Broker controller 等信息,记录 Consumer 的消费消息的偏移量等信息。

2 Broker: 主要负责管理数据,处理数据的生产、消费请求及副本的同步等信息。

3 Topic: 标识一个类别的消息。

4 Partition: 针对 topic 进行了进一步细分,增加并发度。牵涉到副本及 leader 选举。

5 Producer: 主要与 Broker 进行交互,来生产消息到 broker

6 Consumer: 主要是从 Broker 上获取消息,将自己的消费偏移等信息记录与 zookeeper

从各个角色的功能来看,我们整个数据服务请求的中心就是 Broker ,自然也是由 Broker 来负责各种事件处理及应答各个部件的。

二, Broker 请求及应答机制的实现

JAVA 的网络 IO 模型彻底讲解的那篇文章里,已经彻底讲解了 Java 的各种网络 IO 实现的机制及优缺点。其实, kafka Broker 就是通过 JAVA NIO 来实现监听和请求处理及应答的。

主要牵涉到的类:

1) ,KafkaServer

该类代表了一个 kafka Broker 的生命周期,处理 kafka 启动或者停止所需要的所有功能。

2) ,SocketServer

一个 NIO 服务中心。线程模型是

1 Acceptor 线程,用来处理新的链接请求

N 个加工 Processor 线程。每个线程拥有一个他们自己的 selector ,主要负责 IO 请求及应答。

3) ,KafkaRequestHandler

实际会在 KafkaRequestHandlerPool 中创建多个对象,负责加工处理 request 线程。

会创建 M 个处理 Handler 线程。负责处理 request 请求,将 responses 重新写会加工线程 Processor ,以便于其写回给客户端。

4) ,RequestChannel

该类主要是封装了 requestQueue responseQueues responseListeners ,便于个各类中同时引用并作出自己的处理。

5) ,KafkaApis

Kafka 多样请求的逻辑处理程序。

具体如图:

Kafka 源码系列之 Broker 的 IO 服务及业务处理

下面讲解 1,2,3,4,5 ,具体含义:

1 SocketServer.startup() ,会启动一个后台线程,该线程会持有一个 acceptor ,负责接收新的链接请求,并轮训所有的 Processor ,将新的链接请求加入 Processor 对象的成员变量 ConcurrentLinkedQueue 里, Processor 会在其 run 方法里面处理。

// start accepting connections
this . acceptor = new Acceptor(host , port , processors , sendBufferSize , recvBufferSize , quotas)
Utils. newThread ( "kafka-socket-acceptor" , acceptor , false ).start()
acceptor .awaitStartup

Processor 池的初始化

for (i <- 0 until numProcessorThreads) {
   processors (i) = new Processor(i ,
                                 time ,
                                 maxRequestSize ,
                                 aggregateIdleMeter ,
                                 newMeter( "IdlePercent" , "percent" , TimeUnit. NANOSECONDS , Map( "networkProcessor" -> i.toString)) ,
                                 numProcessorThreads ,
                                 requestChannel ,
                                 quotas ,
                                 connectionsMaxIdleMs)
  Utils. newThread ( "kafka-network-thread-%d-%d" .format(port , i) , processors (i) , false ).start()
}

accepttor轮训Processor

val ready = selector.select(500)
if(ready > 0) {
  val keys = selector.selectedKeys()
  val iter = keys.iterator()
  while(iter.hasNext && isRunning) {
    var key: SelectionKey = null
    try {
      key = iter.next
      iter.remove()
      if(key.isAcceptable)
         accept(key, processors(currentProcessor))
      else
         throw new IllegalStateException("Unrecognized key state for acceptor thread.")

      // round robin to the next processor thread
      currentProcessor = (currentProcessor + 1) % processors.length
    } catch {
      case e: Throwable => error("Error while accepting connection", e)
    }

2 Processor run 方法里面,会针对可读事件调用 read 方法里将 request 请求信息通过 requestChannel.sendRequest(req) 添加到 RequestChannel 的成员变量里面。

requestQueue  = new ArrayBlockingQueue[RequestChannel.Request](queueSize)

3 ,在 KafkaServer startup 方法里面构建 KafkaRequestHandlerPool 对象的时候,会构建若干 handler 线程。

for (i <- 0 until numThreads) {
   runnables (i) = new KafkaRequestHandler(i , brokerId , aggregateIdleMeter , numThreads , requestChannel , apis)
   threads (i) = Utils. daemonThread ( "kafka-request-handler-" + i , runnables (i))
   threads (i).start()
}

KafakRequestHandler 的方法里面会对我们的 request 进行处理

req = requestChannel.receiveRequest( 300 )
apis.handle(req)

实际上,是通过 KafkaApis 对象的 handle 方法进行各种逻辑的处理的。

def handle (request: RequestChannel.Request) {
   try {
    trace( "Handling request: " + request. requestObj + " from client: " + request.remoteAddress)
    request. requestId match {
       case RequestKeys. ProduceKey => handleProducerOrOffsetCommitRequest(request)
       case RequestKeys. FetchKey => handleFetchRequest(request)
       case RequestKeys. OffsetsKey => handleOffsetRequest(request)
       case RequestKeys. MetadataKey => handleTopicMetadataRequest(request)
       case RequestKeys. LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
       case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)
       case RequestKeys. UpdateMetadataKey => handleUpdateMetadataRequest(request)
       case RequestKeys. ControlledShutdownKey => handleControlledShutdownRequest(request)
       case RequestKeys. OffsetCommitKey => handleOffsetCommitRequest(request)
       case RequestKeys. OffsetFetchKey => handleOffsetFetchRequest(request)
       case RequestKeys. ConsumerMetadataKey => handleConsumerMetadataRequest(request)
       case requestId => throw new KafkaException( "Unknown api code " + requestId)
    }
  } catch {
     case e: Throwable =>
      request. requestObj .handleError(e , requestChannel , request)
      error( "error when handling request %s" .format(request. requestObj ) , e)
  } finally
     request. apiLocalCompleteTimeMs = SystemTime. milliseconds
}

4 ,在每一种请求处理结束之后会产生对应的 response

requestChannel.sendResponse( new RequestChannel.Response(request , new BoundedByteBufferSend(response)))

并将 response 存储到 RequestChannel responseQueues 存储。

5 ,最终,由我们的 Processor 在其 run 方法里面,取出 RequestChannel responseQueues 存储的时间,匹配到写事件,然后通过其 write 方法对具体的 request 进行应答。

else if (key.isWritable)
  write(key)

三,总结

这是一个典型的 Reactor 多线程模型,并且实现了 IO 线程和业务线程进行隔离。这样做的优点有以下几种 :

1, 充分利用资源

可以充分利用 CPU 资源,增加并发度,使业务响应速度加快。

2, 故障隔离 :

业务处理线程,无论是处理耗时,还是发生阻塞,都不会影响 IO 请求线程。保证服务器能在某些业务线程出故障的情况下,正常进行 IO 请求应答。

3, 可维护性

职责单一,可维护性高,方便定位问题。

此处再次建议大家仔细阅读,浪尖关于 JAVA 的网络 IO 模型彻底讲解那篇文章,彻底领会其意境。

此乃,原创。欢迎大家扫描二维码,关注浪尖微信公众号,大家共同进步。

Kafka 源码系列之 Broker 的 IO 服务及业务处理


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript and Ajax for the Web, Sixth Edition

JavaScript and Ajax for the Web, Sixth Edition

Tom Negrino、Dori Smith / Peachpit Press / August 28, 2006 / $24.99

Book Description Need to learn JavaScript fast? This best-selling reference’s visual format and step-by-step, task-based instructions will have you up and running with JavaScript in no time. In thi......一起来看看 《JavaScript and Ajax for the Web, Sixth Edition》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具