内容简介:由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Proce
由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其 SocketServer#startup
方法执行组件的启动过程。SocketServer 是 kafka 对外提供网络服务的核心实现类,在 kafka 运行过程中用于接收来自客户端和其它 broker 节点的网络请求。考虑到性能上的需求,SocketServer 采用了Reactor 模式,并基于 java NIO 实现。
参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Processor,而 Handler 与 Processor 之间通过请求队列进行通信。
一. SocketServer 组件
SocketServer 是整个 kafka server 网络模型的管家类,主要用于构建和启动整个网络模块。SocketServer 类的字段定义如下:
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { /** 封装服务器对应的多张网卡,kafka 可以同时监听这些 IP 和端口,每个 EndPoint 对应一个 Acceptor */ private val endpoints: Map[ListenerName, EndPoint] = config.listeners.map(l => l.listenerName -> l).toMap /** 每个 Acceptor 对应的 Processor 对应的线程数 */ private val numProcessorThreads = config.numNetworkThreads /** broker 节点上 Processor 线程总数 */ private val totalProcessorThreads = numProcessorThreads * endpoints.size /** 请求队列中缓存的最大请求个数 */ private val maxQueuedRequests = config.queuedMaxRequests /** 每个 IP 允许创建的最大连接数 */ private val maxConnectionsPerIp = config.maxConnectionsPerIp /** 针对特定 IP 指定的允许创建的最大连接数,会覆盖 maxConnectionsPerIp 配置 */ private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides /** Processor 线程与 Handler 线程之间交换数据的通道 */ val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) /** Acceptor 对象集合,每个 EndPoint 对应一个 Acceptor */ private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() /** Processor 对象集合,封装所有的 Processor 对象 */ private val processors = new Array[Processor](totalProcessorThreads) /** 用于控制每个 IP 上的最大连接数 */ private var connectionQuotas: ConnectionQuotas = _ // ... 省略方法定义 }
各字段的含义参考注释,其中 EndPoint 类用于封装服务器对应的 host、port,以及网络协议等信息,而 RequestChannel 类定义了 Processor 和 Handler 之间交换数据的通道,该类的字段定义如下:
class RequestChannel(val numProcessors: Int, // Processor 线程总数 val queueSize: Int // 请求队列的大小 ) extends KafkaMetricsGroup { /** 响应监听器列表,当 Handler 往响应队列写回响应数据时唤醒对应的 Processor 线程进行处理 */ private var responseListeners: List[Int => Unit] = Nil /** 请求队列,所有的 Processor 共用一个 */ private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) /** 响应队列,每个 Processor 对应一个响应队列 */ private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) // ... 省略方法定义 }
RequestChannel 封装了请求队列和响应队列,这里需要注意的一点是请求队列是 Processor 线程共享的,而响应队列则是每个 Processor 线程专属的。Processor 负责将读取到的请求写入请求队列中,并从自己的响应队列中取出响应对象发送给请求方。Handler 负责从请求队列中读取请求进行处理,并在处理完成之后将响应对象写入到之前读取该请求的 Processor 的响应队列中。关于 Acceptor、Processor 和 Handler 的实现下文会专门进行分析,这里我们先来看一下 SocketServer 的启动逻辑,位于 SocketServer#startup
方法中,实现如下:
def startup() { synchronized { // 创建控制 IP 最大连接数的 ConnectionQuotas 对象 connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) // 指定 socket send buffer 的大小(对应 socket.send.buffer.bytes 配置) val sendBufferSize = config.socketSendBufferBytes // 指定 socket receive buffer 的大小(对应 socket.receive.buffer.bytes 配置) val recvBufferSize = config.socketReceiveBufferBytes // 获取 broker 节点 ID val brokerId = config.brokerId var processorBeginIndex = 0 // 遍历为每个 EndPoint,创建并绑定对应的 Acceptor 和 Processor config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads // 按照指定的 processor 线程数,为每个 EndPoint 创建对应数量的 Processor 对象, // 编号区间 [processorBeginIndex, processorEndIndex) for (i <- processorBeginIndex until processorEndIndex) processors(i) = this.newProcessor(i, connectionQuotas, listenerName, securityProtocol) // 为当前 EndPoint 创建并绑定一个 Acceptor 对象 val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) // 启动 Acceptor 线程 Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() // 主线程等待 Acceptor 线程启动完成 acceptor.awaitStartup() processorBeginIndex = processorEndIndex } } info("Started " + acceptors.size + " acceptor threads") }
SocketServer 启动过程中会遍历为当前 broker 节点上的每张网卡创建并绑定对应 Acceptor 对象,然后按照配置的 Processor 线程数(对应 num.network.threads
配置)为每个 Acceptor 创建并绑定对应数量的 Processor 实例,最后启动 Acceptor 线程。
二. Acceptor 组件
Acceptor 主要负责接收来自客户端和其它 broker 节点的请求,并创建对应的 socket 连接交由 Processor 进行处理。Acceptor 类的字段定义如下:
private[kafka] class Acceptor(val endPoint: EndPoint, // 对应的网卡信息 val sendBufferSize: Int, // socket send buffer size val recvBufferSize: Int, // socket receive buffer size brokerId: Int, // broker 节点 id processors: Array[Processor], // 绑定的 Processor 线程集合 connectionQuotas: ConnectionQuotas // 控制 IP 连接数的对象 ) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { /** NIO Selector */ private val nioSelector = NSelector.open() /** ServerSocketChannel 对象,监听对应网卡的指定端口 */ val serverChannel: ServerSocketChannel = this.openServerSocket(endPoint.host, endPoint.port) // ... 省略方法定义 }
SocketServer 在启动过程中会创建并启动 Acceptor 线程,由上面的定义可以看出 Acceptor 继承自 AbstractServerThread 抽象类,而 AbstractServerThread 实现了 Runnable 接口,并提供了对线程的基本管理方法。Acceptor 的具体执行逻辑位于 Acceptor#run
方法中:
def run() { // 注册监听 OP_ACCEPT 事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 标记当前线程启动完成,以便 SocketServer 能够继续为其它网卡创建并绑定对应的 Acceptor 线程 this.startupComplete() try { var currentProcessor = 0 // 当前生效的 processor 编号 while (isRunning) { try { // 等待关注的事件 val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() // 遍历处理接收到的请求 while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() // 如果是 OP_ACCEPT 事件,则调用 accept 方法进行处理 if (key.isAcceptable) this.accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // 基于轮询算法选择下一个 Processor 处理下一次请求,负载均衡 currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") this.swallowError(serverChannel.close()) this.swallowError(nioSelector.close()) this.shutdownComplete() } } def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] // 创建 SocketChannel 对象 val socketChannel = serverSocketChannel.accept() try { // 增加对应 IP 上的连接数,如果连接数超过阈值,则抛 TooManyConnectionsException 异常 connectionQuotas.inc(socketChannel.socket().getInetAddress) // 配置 SocketChannel 对象,非阻塞模式 socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) // 将 SocketChannel 交给 Processor 进行处理 processor.accept(socketChannel) } catch { // 连接数过多,关闭当前通道上的连接,并将连接计数减 1 case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) this.close(socketChannel) } }
上述方法的执行逻辑是一个典型的 NIO server 的实现。Acceptor 会循环监听 OP_ACCEPT
事件,当有新的连接请求到达时会创建并配置连接对应的 SocketChannel 对象,并交由 Processor 处理(调用 Processor#accept
方法)。我们知道一个 Acceptor 上绑定了多个 Processor 线程,为了保证各个 Processor 的负载均衡,这里使用了简单的轮询算法,逐个选择 Processor 线程处理请求。
对于新进来的请求,Acceptor 首先会使用 ConnectionQuotas 对象管理请求 IP 上的连接数,并在连接数超过配置的阈值(默认对应 max.connections.per.ip
配置,可以通过 max.connections.per.ip.overrides
配置覆盖默认配置)时触发限流机制,关闭当前连接的通道。
三. Processor 组件
Processor 主要负责读取来自请求方的请求,并向请求方发送响应,但是本身不负责对请求进行处理,而是委托给相应的 Handler 线程进行处理。Processor 中几个重要的字段定义如下:
/** Processor 与 Handler 线程之间传递请求数据的队列 */ val requestChannel: RequestChannel /** 记录分配给当前 Processor 的待处理的 SocketChannel 对象 */ private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() /** 缓存未发送给客户端的响应,由于客户端不会进行确认,所以服务端在发送成功之后会将其移除 */ private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
Acceptor 线程在收到连接请求之后,会将请求封装成 SocketChannel 对象,并调用 Processor#accept
方法将其分配给对应的 Processor 线程进行处理,该对象会被记录到 Processor#newConnections
字段中,并唤醒对应的 Processor 线程。方法 Processor#accept
的实现如下:
def accept(socketChannel: SocketChannel) { // 将 Acceptor 分配的 SocketChannel 对象缓存到同步队列中 newConnections.add(socketChannel) // 唤醒 Processor 线程处理队列 this.wakeup() // 本质上调用 NIO Server 的 wakeup 方法 }
Processor 同样继承了 AbstractServerThread 抽象类,所以也是一个线程类实现。在创建 Acceptor 对象的过程中会遍历启动分配给当前 Acceptor 的 Processor 线程。
synchronized { // 遍历启动分配给当前 Acceptor 的 Processor 线程 processors.foreach { processor => Utils.newThread( s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor, false).start() } }
Processor 的 Processor#run
方法在线程启动之后,会一直循环处理 Acceptor 分配的请求,读取并封装请求数据到队列中,然后等待 Handler 线程处理。对于已经处理完成的请求对应的响应对象,Processor 线程会依据响应类型分而治之。方法 Processor#run
的实现如下:
override def run() { // 标识当前线程启动完成 this.startupComplete() while (isRunning) { try { // 1. 遍历获取分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ 事件 this.configureNewConnections() // 2. 遍历处理当前 Processor 的响应队列,依据响应类型进行处理 this.processNewResponses() // 3. 发送缓存的响应对象给客户端 this.poll() // 4. // 遍历处理 poll 操作放置在 Selector 的 completedReceives 队列中的请求, // 封装请求信息为 Request 对象,并记录到请求队列中等待 Handler 线程处理, // 同时标记当前 Selector 暂时不再接收新的请求 this.processCompletedReceives() // 5. // 遍历处理 poll 操作放置在 Selector 的 completedSends 队列中的请求, // 将其从 inflightResponses 集合中移除,并标记当前 Selector 可以继续读取数据 this.processCompletedSends() // 6. // 遍历处理 poll 操作放置在 Selector 的 disconnected 集合中的断开的连接, // 将连接对应的所有响应从 inflightResponses 中移除,同时更新对应 IP 的连接数 this.processDisconnected() } catch { case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception.", e) } } debug("Closing selector - processor " + id) // 关闭所有的连接以及选择器 this.swallowError(closeAll()) this.shutdownComplete() }
当 Processor 线程启动完成后会调用 Processor#startupComplete
方法标识当前线程启动完成,然后开始进入循环,依次执行以下操作:
OP_READ
下面对各个步骤逐一进行深入分析,首先来看 步骤 1 ,实现位于 Processor#configureNewConnections
方法中:
private def configureNewConnections() { while (!newConnections.isEmpty) { // 获取待处理 SocketChannel 对象 val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString // 注册 OP_READ 事件 selector.register(connectionId, channel) } catch { // 对于不致命的异常,则捕获并关闭对应的通道 case NonFatal(e) => val remoteAddress = channel.getRemoteAddress this.close(channel) error(s"Processor $id closed connection from $remoteAddress", e) } } }
前面我们曾介绍过 Acceptor 会将请求对应的 SocketChannel 对象记录到 Processor#newConnections
字段中,而这一步的主要任务就是遍历处理这些 SocketChannel 对象,分别将 Processor 对应的 Selector 注册到这些通道上(对应 OP_READ
事件),用于读取请求数据。
步骤 2会遍历消费当前 Processor 的响应队列,按照响应的类型分别处理,实现位于 Processor#processNewResponses
方法中:
private def processNewResponses() { // 获取当前 Processor 的响应队列 var curr = requestChannel.receiveResponse(id) while (curr != null) { try { // 依据响应类型对响应进行处理 curr.responseAction match { // 暂时没有响应需要发送,如果对应的通道未被关闭,则继续注册 OP_READ 事件读取请求数据 case RequestChannel.NoOpAction => curr.request.updateRequestMetrics() trace("Socket server received empty response to send, registering for read: " + curr) val channelId = curr.request.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) selector.unmute(channelId) // 注册 OP_READ 事件 // 当前响应需要发送给请求方 case RequestChannel.SendAction => // 发送该响应,并将响应对象记录到 inflightResponses 集合中 this.sendResponse(curr) // 需要关闭当前连接 case RequestChannel.CloseConnectionAction => curr.request.updateRequestMetrics() trace("Closing socket connection actively according to the response code.") // 关闭连接 this.close(selector, curr.request.connectionId) } } finally { // 获取下一个待处理的响应 curr = requestChannel.receiveResponse(id) } } }
我们知道 Processor 本身不负责处理请求,它只是封装请求交由 Handler 线程进行处理,同时每一个 Processor 会维护一个响应队列,Handler 线程在处理完请求之后会将对应的响应对象放置到对应 Processor 的响应队列中,而这一步会遍历处理该响应队列,并依据响应类型分而治之:
- 如果当前没有响应需要处理,那么会重新在对应的通道上注册
OP_READ
事件,以继续读取新的请求数据。 - 如果当前的响应需要发送给请求方,则会调用
Processor#sendResponse
方法发送响应,并将响应对象记录到Processor#inflightResponses
字段中,表示该响应对象正在被发送。 - 如果当前的响应类型表示需要关闭对应的连接,则会调用
Processor#close
方法关闭对应的通道,并更新对应 IP 上的连接数。
步骤 3会发送步骤 2 缓存的响应请求,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中,而步骤 4 至 6 的逻辑则分别对应处理这 3 个集合。首先看一下 步骤 4 ,相应实现位于 Processor#processCompletedReceives
方法中:
private def processCompletedReceives() { // 遍历处理接收到的请求 selector.completedReceives.asScala.foreach { receive => try { // 获取请求对应的通道 val openChannel = selector.channel(receive.source) // 创建通道对应的 Session 对象,用于权限控制 val session = { // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'. val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) } // 封装请求信息为 Request 对象 val req = RequestChannel.Request( processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName, securityProtocol = securityProtocol) // 将请求对象放入请求队列中,等待 Handler 线程处理 requestChannel.sendRequest(req) // 取消注册的 OP_READ 事件,处理期间不再接收新的请求(即不读取新的请求数据) selector.mute(receive.source) } catch { case e@(_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error(s"Closing socket for ${receive.source} because of error", e) close(selector, receive.source) } } }
这一步会遍历处理 Selector 的 completedReceives 集合,对于收到的请求对象会读取请求数据,并封装成 Request 对象记录到请求队列 Processor#requestChannel
中,等待 Handler 线程处理,同时取消之前注册到对应通道的 OP_READ
事件,在处理完成之前不再读取新的请求数据。这里调用了 RequestChannel#sendRequest
方法将 Request 对象放置到一个被 Processor 共享的请求队列中,后续 Handler 线程会消费该队列处理对应的请求。
步骤 5会遍历处理 Selector 的 completedSends 集合,其中存放了已经发送成功的响应,对于这些响应可以从 Processor#inflightResponses
中移除,实现如下:
private def processCompletedSends() { // 遍历处理已经完全发送出去的请求 selector.completedSends.asScala.foreach { send => // 因为当前响应已经发送成功,从 inflightResponses 中移除,不需要客户端确认 val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() // 注册 OP_READ 事件,继续读取请求数据 selector.unmute(send.destination) } }
步骤 6会遍历处理 Selector 的 disconnected 集合,对于已经断开的连接,将本地记录的待发送完成的响应对象从 Processor#inflightResponses
中移除,同时更新对应 IP 上的连接数,实现如下:
private def processDisconnected() { // 遍历处理已经断开的连接 selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost // 将连接对应的所有响应从 inflightResponses 中移除 inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // 对应的通道已经被关闭,所以需要减少对应 IP 上的连接数 connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }
四. Handler 组件
Processor 在将对应的 Request 请求对象记录到全局共享的请求队列之后,Handler 线程会消费该队列并处理对应的请求,同时将处理完成的请求对应的响应对象写入到之前读取该请求的 Processor 的响应队列中。Handler 的实现由 KafkaRequestHandler 和 KafkaRequestHandlerPool 两个类构成,其中 KafkaRequestHandlerPool 是对 KafkaRequestHandler 的封装,提供了对 Handler 线程的管理。KafkaRequestHandlerPool 的实现比较简单,我们主要来看一下 KafkaRequestHandler 的实现,KafkaRequestHandler 实现了 Runnable 接口,其 KafkaRequestHandler#run
方法实现如下:
override def run() { while (true) { try { var req: RequestChannel.Request = null while (req == null) { val startSelectTime = time.nanoseconds // 从请求队列中获取 Processor 封装的请求 req = requestChannel.receiveRequest(300) val idleTime = time.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } // 如果是 AllDone 请求,则退出当前线程 if (req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId)) return } req.requestDequeueTimeMs = time.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) // 处理请求,将响应写回到对应 Processor 的响应队列中,并唤醒 Processor 线程 apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }
上述方法诠释了 Handler 的全部运行逻辑,首先调用 RequestChannel#receiveRequest
方法超时等待从全局请求队列中获取请求对象,如果获取到的请求对象是 RequestChannel.AllDone
类型,则说明当前请求退出相应线程,否则 Handler 线程会调用 KafkaApis#handle
方法对请求进行处理,并将响应结果写入到对应 Processor 的响应队列中。
KafkaApis 类是 kafka 中的一个核心类实现,用于分发各种类型的请求给到相应的组件,针对每一种请求都定义了相应的方法进行处理,上面调用 KafkaApis#handle
方法实现如下:
def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) // 依据请求类型分发请求 ApiKeys.forId(request.requestId) match { // 处理 ProduceRequest 请求 case ApiKeys.PRODUCE => handleProducerRequest(request) // 处理 FetchRequest 请求 case ApiKeys.FETCH => handleFetchRequest(request) // 处理 ListOffsetRequest 请求 case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) // 处理 MetadataRequest 请求 case ApiKeys.METADATA => handleTopicMetadataRequest(request) // 处理 LeaderAndIsrRequest 请求 case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) // 处理 StopReplicaRequest 请求 case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) // 处理 UpdateMetadataRequest 请求 case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) // 处理 ControlledShutdownRequest 请求 case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) // 处理 OffsetCommitRequest 请求 case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) // 处理 OffsetFetchRequest 请求 case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) // 处理 GroupCoordinatorRequest 请求 case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) // 处理 JoinGroupRequest 请求 case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) // 处理 HeartbeatRequest 请求 case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) // 处理 LeaveGroupRequest 请求 case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) // 处理 SyncGroupRequest 请求 case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) // 处理 DescribeGroupsRequest 请求 case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) // 处理 ListGroupsRequest 请求 case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) // 处理 SaslHandshakeRequest 请求 case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) // 处理 ApiVersionsRequest 请求 case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) // 处理 CreateTopicsRequest 请求 case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) // 处理 DeleteTopicsRequest 请求 case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { // ... 省略异常处理 } finally request.apiLocalCompleteTimeMs = time.milliseconds }
枚举类 ApiKeys 为每一种请求类型定义了一个唯一的标识,KafkaApis 会依据具体的请求类型,将请求委托给对应的 handle*
方法进行处理,这些方法基本的执行逻辑可以概括为:
- 解析获取相应类型的请求对象;
- 权限校验;
- 委托对应的组件处理请求;
- 发送响应,或定义响应回调函数,并由具体的组件回调执行。
相应的实现这里先不展开,后续分析具体组件时再针对性分析。
五. 总结
本文我们分析了 kafka 的网络交互模型设计与实现,考虑到客户端与集群之间,以及 broker 节点之间的交互均基于请求进行通信,所以必须保证网络交互这一块的低延迟和高性能。相比于传统的“thread-per-connection”线程模型,Kafka 采用了 reactor 模式以满足实际的需求,并借助于 java NIO 进行实现。整个网络交互模型主要分为 Acceptor、Processor 和 Handler 三大组件,其中 Acceptor 负责接收请求,Processor 负责解析请求并发送响应,而具体的请求处理过程则交由 Handler 负责,其中的设计思想值得我们在开发自己的项目中借鉴。
转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析
- 用Asp与XML实现交互的一个实例源码
- iOS 12 人机交互指南:交互(User Interaction)
- 生活NLP云服务“玩秘”站稳人机交互2.0语音交互场景
- asyncio之子进程交互
- 以太坊交互工具
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
新媒体革命——在线时代的媒体、公关与传播
仇勇 / 电子工业出版社 / 2016-2-1 / CNY 50.00
这既是传统媒体的大裂变年代,也是在线媒体开启的新闻业的黄金时代。 信息流动的新法则不仅改变了媒体业,也在重塑公关、传播和商业的面貌。总之,这个世界的连接方式不仅不再相同,而且这一改变不可逆转。在这个全新重启的在线时代里,无论是信息的获取还是商业本身,信任都变得比以往更重要。 从告别传统媒体的那一刻起,我就有着两个小小的“野心”:一是探寻适合在线时代的媒体生产方式;二是让优质内容有权获得......一起来看看 《新媒体革命——在线时代的媒体、公关与传播》 这本书的介绍吧!