内容简介:从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。Kafka 提供了Kafka 驱动类的 main 方法实现如下:
从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。
Kafka 提供了 kafka-server-start.sh
脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka
类来启动 kafka 服务,这也是 kafka 整个服务端的驱动类。在 kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup
方法启动服务。
Kafka 驱动类的 main 方法实现如下:
def main(args: Array[String]): Unit = { try { // 解析命令行参数 val serverProps = getPropsFromArgs(args) // 创建 kafkaServerStartable 对象,期间会初始化监控上报程序 val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法 Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { kafkaServerStartable.shutdown() } }) // 本质上调用的是 KafkaServer#startup 方法 kafkaServerStartable.startup() // 阻塞等待 kafka server 运行线程关闭 kafkaServerStartable.awaitShutdown() } catch { case e: Throwable => fatal(e) System.exit(1) } System.exit(0) }
KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:
class KafkaServer(val config: KafkaConfig, // 配置信息对象 time: Time = Time.SYSTEM, // 时间戳工具 threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序 ) extends Logging with KafkaMetricsGroup { /** 标识节点已经启动完成 */ private val startupComplete = new AtomicBoolean(false) /** 标识节点正在执行关闭操作 */ private val isShuttingDown = new AtomicBoolean(false) /** 标识节点正在执行启动操作 */ private val isStartingUp = new AtomicBoolean(false) /** 阻塞主线程等待 KafkaServer 的关闭 */ private var shutdownLatch = new CountDownLatch(1) /** 记录 broker 节点的当前状态 */ val brokerState: BrokerState = new BrokerState /** Api 接口类,用于分发各种类型的请求 */ var apis: KafkaApis = _ /** 权限控制相关 */ var authorizer: Option[Authorizer] = None var credentialProvider: CredentialProvider = _ /** 网络 socket 服务 */ var socketServer: SocketServer = _ /** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */ var requestHandlerPool: KafkaRequestHandlerPool = _ /** 日志数据管理 */ var logManager: LogManager = _ /** 管理当前 broker 节点上的分区副本 */ var replicaManager: ReplicaManager = _ /** topic 增删管理 */ var adminManager: AdminManager = _ /** 动态配置管理 */ var dynamicConfigHandlers: Map[String, ConfigHandler] = _ var dynamicConfigManager: DynamicConfigManager = _ /** group 协调管理组件 */ var groupCoordinator: GroupCoordinator = _ /** 集群控制组件 */ var kafkaController: KafkaController = _ /** 定时任务调度器 */ val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) /** broker 节点活跃性检查 */ var kafkaHealthcheck: KafkaHealthcheck = _ /** broker 缓存整个集群中全部分区的状态信息 */ var metadataCache: MetadataCache = _ /** ZK 操作 工具 类 */ var zkUtils: ZkUtils = _ // ... 省略方法定义 }
在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown
方法实现,在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch
字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown
方法只是简单的调用了 CountDownLatch#await
方法来阻塞主线程。当 KafkaServer#shutdown
方法执行完成后,会调用 CountDownLatch#countDown
方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。
一. 服务启动过程分析
方法 KafkaServer#shutdown
的实现我们稍后进行分析,下面首先看一下 kafka 服务的启动过程,即 KafkaServer#startup
方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤进行进一步分析:
/brokers/ids/{brokerId}
下面针对上述流程中的 2、3、4 和 6 几个步骤进行进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。
首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:
sealed trait BrokerStates { def state: Byte } case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
关于每种状态的解释和状态转换图如下:
- NotRunning :初始状态,标识当前 broker 节点未运行。
- Starting :标识当前 broker 节点正在启动中。
- RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
- RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
- PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。
所谓 controlled shutdown,实际上是 kafka 提供的一种友好的关闭 broker 节点的机制,除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。
下面继续来分析一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:
trait Scheduler { def startup() def shutdown() def isStarted: Boolean def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) }
其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。
步骤 4调用了 KafkaServer#initZk
方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk
会基于 zookeeper.connect
配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:
/brokers/ids/{id} /brokers/topics/{topic}/partitions /brokers/topics/{topic}/partitions/{partition_id}/state /controller /controller_epoch /admin/reassign_partitions /admin/preferred_replica_election /admin/delete_topics /isr_change_notification /config
最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 kafka 服务之前,可以在配置中通过 broker.id
配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId
方法中,实现如下:
private def getBrokerId: Int = { // 获取配置的 brokerId var brokerId = config.brokerId val brokerIdSet = mutable.HashSet[Int]() // 遍历 log.dirs 配置的 log 目录列表 for (logDir <- config.logDirs) { // 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息 val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() brokerMetadataOpt.foreach { brokerMetadata => brokerIdSet.add(brokerMetadata.brokerId) } } if (brokerIdSet.size > 1) { // 不允许多个 broker 节点共享同一个 log 目录 // ... 抛出 InconsistentBrokerIdException 异常,略 } else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) { // 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致 // ... 抛出 InconsistentBrokerIdException 异常,略 } else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) { // 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性 brokerId = generateBrokerId } else if (brokerIdSet.size == 1) { // 从 meta.properties 中获取 brokerId brokerId = brokerIdSet.last } brokerId }
在 broker 节点的每个 log 目录下有一个 meta.properties
文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 kafka 可以通过该文件约束 broker.id
配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。
二. 服务关闭过程分析
Broker 节点在关闭对应的 kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown
中:
def shutdown() { try { info("shutting down") // 如果正在启动,则不允许关闭 if (isStartingUp.get) throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) // 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作 brokerState.newState(BrokerShuttingDown) /* 依次关闭相应注册的组件 */ if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) CoreUtils.swallow(kafkaScheduler.shutdown()) if (apis != null) CoreUtils.swallow(apis.close()) CoreUtils.swallow(authorizer.foreach(_.close())) if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown()) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown()) if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown()) if (logManager != null) CoreUtils.swallow(logManager.shutdown()) if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) // 设置 broker 状态为 NotRunning,表示关闭成功 brokerState.newState(NotRunning) // 设置状态标记 startupComplete.set(false) isShuttingDown.set(false) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString)) shutdownLatch.countDown() info("shut down completed") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer shutdown.", e) isShuttingDown.set(false) throw e } }
整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行分析。
三. 总结
本文我们主要分析了 kafka 服务启动和关闭的过程,Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性分析。
转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Elasticsearch源码分析 | 单节点的启动和关闭
- Redis源码解析:集群手动故障转移、从节点迁移详解
- 以太坊源码分析—p2p节点发现与协议运行
- Mybatis Mapper.xml 配置文件中 resultMap 节点的源码解析 原 荐
- 【Filecoin源码仓库全解析】第一章:搭建Filecoin测试节点 | 嘉乐的SOHO
- 兄弟连区块链教程eth源码分析node包建立多重协议eth节点
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
HTTP Essentials
Stephen A. Thomas、Stephen Thomas / Wiley / 2001-03-08 / USD 34.99
The first complete reference guide to the essential Web protocol As applications and services converge and Web technologies not only assume HTTP but require developers to manipulate it, it is be......一起来看看 《HTTP Essentials》 这本书的介绍吧!