Kafka 源码解析:网络交互模型

栏目: 后端 · 发布时间: 6年前

内容简介:由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Proce

由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其 SocketServer#startup 方法执行组件的启动过程。SocketServer 是 kafka 对外提供网络服务的核心实现类,在 kafka 运行过程中用于接收来自客户端和其它 broker 节点的网络请求。考虑到性能上的需求,SocketServer 采用了Reactor 模式,并基于 java NIO 实现。

参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Processor,而 Handler 与 Processor 之间通过请求队列进行通信。

Kafka 源码解析:网络交互模型

一. SocketServer 组件

SocketServer 是整个 kafka server 网络模型的管家类,主要用于构建和启动整个网络模块。SocketServer 类的字段定义如下:

class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

    /** 封装服务器对应的多张网卡,kafka 可以同时监听这些 IP 和端口,每个 EndPoint 对应一个 Acceptor */
    private val endpoints: Map[ListenerName, EndPoint] = config.listeners.map(l => l.listenerName -> l).toMap
    /** 每个 Acceptor 对应的 Processor 对应的线程数 */
    private val numProcessorThreads = config.numNetworkThreads
    /** broker 节点上 Processor 线程总数 */
    private val totalProcessorThreads = numProcessorThreads * endpoints.size
    /** 请求队列中缓存的最大请求个数 */
    private val maxQueuedRequests = config.queuedMaxRequests
    /** 每个 IP 允许创建的最大连接数 */
    private val maxConnectionsPerIp = config.maxConnectionsPerIp
    /** 针对特定 IP 指定的允许创建的最大连接数,会覆盖 maxConnectionsPerIp 配置 */
    private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
    /** Processor 线程与 Handler 线程之间交换数据的通道 */
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    /** Acceptor 对象集合,每个 EndPoint 对应一个 Acceptor */
    private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
    /** Processor 对象集合,封装所有的 Processor 对象 */
    private val processors = new Array[Processor](totalProcessorThreads)
    /** 用于控制每个 IP 上的最大连接数 */
    private var connectionQuotas: ConnectionQuotas = _

    // ... 省略方法定义

}

各字段的含义参考注释,其中 EndPoint 类用于封装服务器对应的 host、port,以及网络协议等信息,而 RequestChannel 类定义了 Processor 和 Handler 之间交换数据的通道,该类的字段定义如下:

class RequestChannel(val numProcessors: Int, // Processor 线程总数
                     val queueSize: Int // 请求队列的大小
                    ) extends KafkaMetricsGroup {

    /** 响应监听器列表,当 Handler 往响应队列写回响应数据时唤醒对应的 Processor 线程进行处理 */
    private var responseListeners: List[Int => Unit] = Nil
    /** 请求队列,所有的 Processor 共用一个 */
    private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
    /** 响应队列,每个 Processor 对应一个响应队列 */
    private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)

    // ... 省略方法定义

}

RequestChannel 封装了请求队列和响应队列,这里需要注意的一点是请求队列是 Processor 线程共享的,而响应队列则是每个 Processor 线程专属的。Processor 负责将读取到的请求写入请求队列中,并从自己的响应队列中取出响应对象发送给请求方。Handler 负责从请求队列中读取请求进行处理,并在处理完成之后将响应对象写入到之前读取该请求的 Processor 的响应队列中。关于 Acceptor、Processor 和 Handler 的实现下文会专门进行分析,这里我们先来看一下 SocketServer 的启动逻辑,位于 SocketServer#startup 方法中,实现如下:

def startup() {
    synchronized {

        // 创建控制 IP 最大连接数的 ConnectionQuotas 对象
        connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

        // 指定 socket send buffer 的大小(对应 socket.send.buffer.bytes 配置)
        val sendBufferSize = config.socketSendBufferBytes
        // 指定 socket receive buffer 的大小(对应 socket.receive.buffer.bytes 配置)
        val recvBufferSize = config.socketReceiveBufferBytes
        // 获取 broker 节点 ID
        val brokerId = config.brokerId

        var processorBeginIndex = 0
        // 遍历为每个 EndPoint,创建并绑定对应的 Acceptor 和 Processor
        config.listeners.foreach { endpoint =>
            val listenerName = endpoint.listenerName
            val securityProtocol = endpoint.securityProtocol
            val processorEndIndex = processorBeginIndex + numProcessorThreads

            // 按照指定的 processor 线程数,为每个 EndPoint 创建对应数量的 Processor 对象,
            // 编号区间 [processorBeginIndex, processorEndIndex)
            for (i <- processorBeginIndex until processorEndIndex)
                processors(i) = this.newProcessor(i, connectionQuotas, listenerName, securityProtocol)

            // 为当前 EndPoint 创建并绑定一个 Acceptor 对象
            val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
                processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
            acceptors.put(endpoint, acceptor)

            // 启动 Acceptor 线程
            Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()

            // 主线程等待 Acceptor 线程启动完成
            acceptor.awaitStartup()

            processorBeginIndex = processorEndIndex
        }
    }

    info("Started " + acceptors.size + " acceptor threads")
}

SocketServer 启动过程中会遍历为当前 broker 节点上的每张网卡创建并绑定对应 Acceptor 对象,然后按照配置的 Processor 线程数(对应 num.network.threads 配置)为每个 Acceptor 创建并绑定对应数量的 Processor 实例,最后启动 Acceptor 线程。

二. Acceptor 组件

Acceptor 主要负责接收来自客户端和其它 broker 节点的请求,并创建对应的 socket 连接交由 Processor 进行处理。Acceptor 类的字段定义如下:

private[kafka] class Acceptor(val endPoint: EndPoint, // 对应的网卡信息
                              val sendBufferSize: Int, // socket send buffer size
                              val recvBufferSize: Int, // socket receive buffer size
                              brokerId: Int, // broker 节点 id
                              processors: Array[Processor], // 绑定的 Processor 线程集合
                              connectionQuotas: ConnectionQuotas // 控制 IP 连接数的对象
                             ) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

    /** NIO Selector */
    private val nioSelector = NSelector.open()
    /** ServerSocketChannel 对象,监听对应网卡的指定端口 */
    val serverChannel: ServerSocketChannel = this.openServerSocket(endPoint.host, endPoint.port)

    // ... 省略方法定义

}

SocketServer 在启动过程中会创建并启动 Acceptor 线程,由上面的定义可以看出 Acceptor 继承自 AbstractServerThread 抽象类,而 AbstractServerThread 实现了 Runnable 接口,并提供了对线程的基本管理方法。Acceptor 的具体执行逻辑位于 Acceptor#run 方法中:

def run() {
    // 注册监听 OP_ACCEPT 事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    // 标记当前线程启动完成,以便 SocketServer 能够继续为其它网卡创建并绑定对应的 Acceptor 线程
    this.startupComplete()
    try {
        var currentProcessor = 0 // 当前生效的 processor 编号
        while (isRunning) {
            try {
                // 等待关注的事件
                val ready = nioSelector.select(500)
                if (ready > 0) {
                    val keys = nioSelector.selectedKeys()
                    val iter = keys.iterator()
                    // 遍历处理接收到的请求
                    while (iter.hasNext && isRunning) {
                        try {
                            val key = iter.next
                            iter.remove()
                            // 如果是 OP_ACCEPT 事件,则调用 accept 方法进行处理
                            if (key.isAcceptable)
                                this.accept(key, processors(currentProcessor))
                            else
                                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                            // 基于轮询算法选择下一个 Processor 处理下一次请求,负载均衡
                            currentProcessor = (currentProcessor + 1) % processors.length
                        } catch {
                            case e: Throwable => error("Error while accepting connection", e)
                        }
                    }
                }
            } catch {
                case e: ControlThrowable => throw e
                case e: Throwable => error("Error occurred", e)
            }
        }
    } finally {
        debug("Closing server socket and selector.")
        this.swallowError(serverChannel.close())
        this.swallowError(nioSelector.close())
        this.shutdownComplete()
    }
}

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    // 创建 SocketChannel 对象
    val socketChannel = serverSocketChannel.accept()
    try {
        // 增加对应 IP 上的连接数,如果连接数超过阈值,则抛 TooManyConnectionsException 异常
        connectionQuotas.inc(socketChannel.socket().getInetAddress)
        // 配置 SocketChannel 对象,非阻塞模式
        socketChannel.configureBlocking(false)
        socketChannel.socket().setTcpNoDelay(true)
        socketChannel.socket().setKeepAlive(true)
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socketChannel.socket().setSendBufferSize(sendBufferSize)

        // 将 SocketChannel 交给 Processor 进行处理
        processor.accept(socketChannel)
    } catch {
        // 连接数过多,关闭当前通道上的连接,并将连接计数减 1
        case e: TooManyConnectionsException =>
            info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
            this.close(socketChannel)
    }
}

上述方法的执行逻辑是一个典型的 NIO server 的实现。Acceptor 会循环监听 OP_ACCEPT 事件,当有新的连接请求到达时会创建并配置连接对应的 SocketChannel 对象,并交由 Processor 处理(调用 Processor#accept 方法)。我们知道一个 Acceptor 上绑定了多个 Processor 线程,为了保证各个 Processor 的负载均衡,这里使用了简单的轮询算法,逐个选择 Processor 线程处理请求。

对于新进来的请求,Acceptor 首先会使用 ConnectionQuotas 对象管理请求 IP 上的连接数,并在连接数超过配置的阈值(默认对应 max.connections.per.ip 配置,可以通过 max.connections.per.ip.overrides 配置覆盖默认配置)时触发限流机制,关闭当前连接的通道。

三. Processor 组件

Processor 主要负责读取来自请求方的请求,并向请求方发送响应,但是本身不负责对请求进行处理,而是委托给相应的 Handler 线程进行处理。Processor 中几个重要的字段定义如下:

/** Processor 与 Handler 线程之间传递请求数据的队列 */
val requestChannel: RequestChannel
/** 记录分配给当前 Processor 的待处理的 SocketChannel 对象 */
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
/** 缓存未发送给客户端的响应,由于客户端不会进行确认,所以服务端在发送成功之后会将其移除 */
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()

Acceptor 线程在收到连接请求之后,会将请求封装成 SocketChannel 对象,并调用 Processor#accept 方法将其分配给对应的 Processor 线程进行处理,该对象会被记录到 Processor#newConnections 字段中,并唤醒对应的 Processor 线程。方法 Processor#accept 的实现如下:

def accept(socketChannel: SocketChannel) {
    // 将 Acceptor 分配的 SocketChannel 对象缓存到同步队列中
    newConnections.add(socketChannel)
    // 唤醒 Processor 线程处理队列
    this.wakeup() // 本质上调用 NIO Server 的 wakeup 方法
}

Processor 同样继承了 AbstractServerThread 抽象类,所以也是一个线程类实现。在创建 Acceptor 对象的过程中会遍历启动分配给当前 Acceptor 的 Processor 线程。

synchronized {
    // 遍历启动分配给当前 Acceptor 的 Processor 线程
    processors.foreach { processor =>
        Utils.newThread(
            s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
            processor, false).start()
    }
}

Processor 的 Processor#run 方法在线程启动之后,会一直循环处理 Acceptor 分配的请求,读取并封装请求数据到队列中,然后等待 Handler 线程处理。对于已经处理完成的请求对应的响应对象,Processor 线程会依据响应类型分而治之。方法 Processor#run 的实现如下:

override def run() {
    // 标识当前线程启动完成
    this.startupComplete()
    while (isRunning) {
        try {
            // 1. 遍历获取分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ 事件
            this.configureNewConnections()

            // 2. 遍历处理当前 Processor 的响应队列,依据响应类型进行处理
            this.processNewResponses()

            // 3. 发送缓存的响应对象给客户端
            this.poll()

            // 4.
            // 遍历处理 poll 操作放置在 Selector 的 completedReceives 队列中的请求,
            // 封装请求信息为 Request 对象,并记录到请求队列中等待 Handler 线程处理,
            // 同时标记当前 Selector 暂时不再接收新的请求
            this.processCompletedReceives()

            // 5.
            // 遍历处理 poll 操作放置在 Selector 的 completedSends 队列中的请求,
            // 将其从 inflightResponses 集合中移除,并标记当前 Selector 可以继续读取数据
            this.processCompletedSends()

            // 6.
            // 遍历处理 poll 操作放置在 Selector 的 disconnected 集合中的断开的连接,
            // 将连接对应的所有响应从 inflightResponses 中移除,同时更新对应 IP 的连接数
            this.processDisconnected()
        } catch {
            case e: ControlThrowable => throw e
            case e: Throwable =>
                error("Processor got uncaught exception.", e)
        }
    }

    debug("Closing selector - processor " + id)
    // 关闭所有的连接以及选择器
    this.swallowError(closeAll())
    this.shutdownComplete()
}

当 Processor 线程启动完成后会调用 Processor#startupComplete 方法标识当前线程启动完成,然后开始进入循环,依次执行以下操作:

OP_READ

下面对各个步骤逐一进行深入分析,首先来看 步骤 1 ,实现位于 Processor#configureNewConnections 方法中:

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
        // 获取待处理 SocketChannel 对象
        val channel = newConnections.poll()
        try {
            debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
            val localHost = channel.socket().getLocalAddress.getHostAddress
            val localPort = channel.socket().getLocalPort
            val remoteHost = channel.socket().getInetAddress.getHostAddress
            val remotePort = channel.socket().getPort
            val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
            // 注册 OP_READ 事件
            selector.register(connectionId, channel)
        } catch {
            // 对于不致命的异常,则捕获并关闭对应的通道
            case NonFatal(e) =>
                val remoteAddress = channel.getRemoteAddress
                this.close(channel)
                error(s"Processor $id closed connection from $remoteAddress", e)
        }
    }
}

前面我们曾介绍过 Acceptor 会将请求对应的 SocketChannel 对象记录到 Processor#newConnections 字段中,而这一步的主要任务就是遍历处理这些 SocketChannel 对象,分别将 Processor 对应的 Selector 注册到这些通道上(对应 OP_READ 事件),用于读取请求数据。

步骤 2会遍历消费当前 Processor 的响应队列,按照响应的类型分别处理,实现位于 Processor#processNewResponses 方法中:

private def processNewResponses() {
    // 获取当前 Processor 的响应队列
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
        try {
            // 依据响应类型对响应进行处理
            curr.responseAction match {
                // 暂时没有响应需要发送,如果对应的通道未被关闭,则继续注册 OP_READ 事件读取请求数据
                case RequestChannel.NoOpAction =>
                    curr.request.updateRequestMetrics()
                    trace("Socket server received empty response to send, registering for read: " + curr)
                    val channelId = curr.request.connectionId
                    if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                        selector.unmute(channelId) // 注册 OP_READ 事件
                // 当前响应需要发送给请求方
                case RequestChannel.SendAction =>
                    // 发送该响应,并将响应对象记录到 inflightResponses 集合中
                    this.sendResponse(curr)
                // 需要关闭当前连接
                case RequestChannel.CloseConnectionAction =>
                    curr.request.updateRequestMetrics()
                    trace("Closing socket connection actively according to the response code.")
                    // 关闭连接
                    this.close(selector, curr.request.connectionId)
            }
        } finally {
            // 获取下一个待处理的响应
            curr = requestChannel.receiveResponse(id)
        }
    }
}

我们知道 Processor 本身不负责处理请求,它只是封装请求交由 Handler 线程进行处理,同时每一个 Processor 会维护一个响应队列,Handler 线程在处理完请求之后会将对应的响应对象放置到对应 Processor 的响应队列中,而这一步会遍历处理该响应队列,并依据响应类型分而治之:

  1. 如果当前没有响应需要处理,那么会重新在对应的通道上注册 OP_READ 事件,以继续读取新的请求数据。
  2. 如果当前的响应需要发送给请求方,则会调用 Processor#sendResponse 方法发送响应,并将响应对象记录到 Processor#inflightResponses 字段中,表示该响应对象正在被发送。
  3. 如果当前的响应类型表示需要关闭对应的连接,则会调用 Processor#close 方法关闭对应的通道,并更新对应 IP 上的连接数。

步骤 3会发送步骤 2 缓存的响应请求,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中,而步骤 4 至 6 的逻辑则分别对应处理这 3 个集合。首先看一下 步骤 4 ,相应实现位于 Processor#processCompletedReceives 方法中:

private def processCompletedReceives() {
    // 遍历处理接收到的请求
    selector.completedReceives.asScala.foreach { receive =>
        try {
            // 获取请求对应的通道
            val openChannel = selector.channel(receive.source)
            // 创建通道对应的 Session 对象,用于权限控制
            val session = {
                // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
                val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
                RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
            }
            // 封装请求信息为 Request 对象
            val req = RequestChannel.Request(
                processor = id,
                connectionId = receive.source,
                session = session,
                buffer = receive.payload,
                startTimeMs = time.milliseconds,
                listenerName = listenerName,
                securityProtocol = securityProtocol)
            // 将请求对象放入请求队列中,等待 Handler 线程处理
            requestChannel.sendRequest(req)
            // 取消注册的 OP_READ 事件,处理期间不再接收新的请求(即不读取新的请求数据)
            selector.mute(receive.source)
        } catch {
            case e@(_: InvalidRequestException | _: SchemaException) =>
                // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
                error(s"Closing socket for ${receive.source} because of error", e)
                close(selector, receive.source)
        }
    }
}

这一步会遍历处理 Selector 的 completedReceives 集合,对于收到的请求对象会读取请求数据,并封装成 Request 对象记录到请求队列 Processor#requestChannel 中,等待 Handler 线程处理,同时取消之前注册到对应通道的 OP_READ 事件,在处理完成之前不再读取新的请求数据。这里调用了 RequestChannel#sendRequest 方法将 Request 对象放置到一个被 Processor 共享的请求队列中,后续 Handler 线程会消费该队列处理对应的请求。

步骤 5会遍历处理 Selector 的 completedSends 集合,其中存放了已经发送成功的响应,对于这些响应可以从 Processor#inflightResponses 中移除,实现如下:

private def processCompletedSends() {
    // 遍历处理已经完全发送出去的请求
    selector.completedSends.asScala.foreach { send =>
        // 因为当前响应已经发送成功,从 inflightResponses 中移除,不需要客户端确认
        val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
        }
        resp.request.updateRequestMetrics()
        // 注册 OP_READ 事件,继续读取请求数据
        selector.unmute(send.destination)
    }
}

步骤 6会遍历处理 Selector 的 disconnected 集合,对于已经断开的连接,将本地记录的待发送完成的响应对象从 Processor#inflightResponses 中移除,同时更新对应 IP 上的连接数,实现如下:

private def processDisconnected() {
    // 遍历处理已经断开的连接
    selector.disconnected.asScala.foreach { connectionId =>
        val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
        }.remoteHost
        // 将连接对应的所有响应从 inflightResponses 中移除
        inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
        // 对应的通道已经被关闭,所以需要减少对应 IP 上的连接数
        connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
}

四. Handler 组件

Processor 在将对应的 Request 请求对象记录到全局共享的请求队列之后,Handler 线程会消费该队列并处理对应的请求,同时将处理完成的请求对应的响应对象写入到之前读取该请求的 Processor 的响应队列中。Handler 的实现由 KafkaRequestHandler 和 KafkaRequestHandlerPool 两个类构成,其中 KafkaRequestHandlerPool 是对 KafkaRequestHandler 的封装,提供了对 Handler 线程的管理。KafkaRequestHandlerPool 的实现比较简单,我们主要来看一下 KafkaRequestHandler 的实现,KafkaRequestHandler 实现了 Runnable 接口,其 KafkaRequestHandler#run 方法实现如下:

override def run() {
    while (true) {
        try {
            var req: RequestChannel.Request = null
            while (req == null) {
                val startSelectTime = time.nanoseconds
                // 从请求队列中获取 Processor 封装的请求
                req = requestChannel.receiveRequest(300)
                val idleTime = time.nanoseconds - startSelectTime
                aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
            }

            // 如果是 AllDone 请求,则退出当前线程
            if (req eq RequestChannel.AllDone) {
                debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
                return
            }
            req.requestDequeueTimeMs = time.milliseconds
            trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
            // 处理请求,将响应写回到对应 Processor 的响应队列中,并唤醒 Processor 线程
            apis.handle(req)
        } catch {
            case e: Throwable => error("Exception when handling request", e)
        }
    }
}

上述方法诠释了 Handler 的全部运行逻辑,首先调用 RequestChannel#receiveRequest 方法超时等待从全局请求队列中获取请求对象,如果获取到的请求对象是 RequestChannel.AllDone 类型,则说明当前请求退出相应线程,否则 Handler 线程会调用 KafkaApis#handle 方法对请求进行处理,并将响应结果写入到对应 Processor 的响应队列中。

KafkaApis 类是 kafka 中的一个核心类实现,用于分发各种类型的请求给到相应的组件,针对每一种请求都定义了相应的方法进行处理,上面调用 KafkaApis#handle 方法实现如下:

def handle(request: RequestChannel.Request) {
    try {
        trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
                format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
        // 依据请求类型分发请求
        ApiKeys.forId(request.requestId) match {
            // 处理 ProduceRequest 请求
            case ApiKeys.PRODUCE => handleProducerRequest(request)
            // 处理 FetchRequest 请求
            case ApiKeys.FETCH => handleFetchRequest(request)
            // 处理 ListOffsetRequest 请求
            case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
            // 处理 MetadataRequest 请求
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            // 处理 LeaderAndIsrRequest 请求
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            // 处理 StopReplicaRequest 请求
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            // 处理 UpdateMetadataRequest 请求
            case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
            // 处理 ControlledShutdownRequest 请求
            case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
            // 处理 OffsetCommitRequest 请求
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            // 处理 OffsetFetchRequest 请求
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            // 处理 GroupCoordinatorRequest 请求
            case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
            // 处理 JoinGroupRequest 请求
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            // 处理 HeartbeatRequest 请求
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            // 处理 LeaveGroupRequest 请求
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            // 处理 SyncGroupRequest 请求
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            // 处理 DescribeGroupsRequest 请求
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            // 处理 ListGroupsRequest 请求
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            // 处理 SaslHandshakeRequest 请求
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            // 处理 ApiVersionsRequest 请求
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            // 处理 CreateTopicsRequest 请求
            case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
            // 处理 DeleteTopicsRequest 请求
            case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId)
        }
    } catch {
        // ... 省略异常处理
    } finally
        request.apiLocalCompleteTimeMs = time.milliseconds
}

枚举类 ApiKeys 为每一种请求类型定义了一个唯一的标识,KafkaApis 会依据具体的请求类型,将请求委托给对应的 handle* 方法进行处理,这些方法基本的执行逻辑可以概括为:

  1. 解析获取相应类型的请求对象;
  2. 权限校验;
  3. 委托对应的组件处理请求;
  4. 发送响应,或定义响应回调函数,并由具体的组件回调执行。

相应的实现这里先不展开,后续分析具体组件时再针对性分析。

五. 总结

本文我们分析了 kafka 的网络交互模型设计与实现,考虑到客户端与集群之间,以及 broker 节点之间的交互均基于请求进行通信,所以必须保证网络交互这一块的低延迟和高性能。相比于传统的“thread-per-connection”线程模型,Kafka 采用了 reactor 模式以满足实际的需求,并借助于 java NIO 进行实现。整个网络交互模型主要分为 Acceptor、Processor 和 Handler 三大组件,其中 Acceptor 负责接收请求,Processor 负责解析请求并发送响应,而具体的请求处理过程则交由 Handler 负责,其中的设计思想值得我们在开发自己的项目中借鉴。

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Writing Windows VxDs and Device Drivers, Second Edition

Writing Windows VxDs and Device Drivers, Second Edition

Karen Hazzah / CMP / 1996-01-12 / USD 54.95

Software developer and author Karen Hazzah expands her original treatise on device drivers in the second edition of "Writing Windows VxDs and Device Drivers." The book and companion disk include the a......一起来看看 《Writing Windows VxDs and Device Drivers, Second Edition》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具