内容简介:从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。Kafka 提供了Kafka 驱动类的 main 方法实现如下:
从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。
Kafka 提供了 kafka-server-start.sh
脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka
类来启动 kafka 服务,这也是 kafka 整个服务端的驱动类。在 kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup
方法启动服务。
Kafka 驱动类的 main 方法实现如下:
def main(args: Array[String]): Unit = { try { // 解析命令行参数 val serverProps = getPropsFromArgs(args) // 创建 kafkaServerStartable 对象,期间会初始化监控上报程序 val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法 Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { kafkaServerStartable.shutdown() } }) // 本质上调用的是 KafkaServer#startup 方法 kafkaServerStartable.startup() // 阻塞等待 kafka server 运行线程关闭 kafkaServerStartable.awaitShutdown() } catch { case e: Throwable => fatal(e) System.exit(1) } System.exit(0) }
KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:
class KafkaServer(val config: KafkaConfig, // 配置信息对象 time: Time = Time.SYSTEM, // 时间戳工具 threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序 ) extends Logging with KafkaMetricsGroup { /** 标识节点已经启动完成 */ private val startupComplete = new AtomicBoolean(false) /** 标识节点正在执行关闭操作 */ private val isShuttingDown = new AtomicBoolean(false) /** 标识节点正在执行启动操作 */ private val isStartingUp = new AtomicBoolean(false) /** 阻塞主线程等待 KafkaServer 的关闭 */ private var shutdownLatch = new CountDownLatch(1) /** 记录 broker 节点的当前状态 */ val brokerState: BrokerState = new BrokerState /** Api 接口类,用于分发各种类型的请求 */ var apis: KafkaApis = _ /** 权限控制相关 */ var authorizer: Option[Authorizer] = None var credentialProvider: CredentialProvider = _ /** 网络 socket 服务 */ var socketServer: SocketServer = _ /** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */ var requestHandlerPool: KafkaRequestHandlerPool = _ /** 日志数据管理 */ var logManager: LogManager = _ /** 管理当前 broker 节点上的分区副本 */ var replicaManager: ReplicaManager = _ /** topic 增删管理 */ var adminManager: AdminManager = _ /** 动态配置管理 */ var dynamicConfigHandlers: Map[String, ConfigHandler] = _ var dynamicConfigManager: DynamicConfigManager = _ /** group 协调管理组件 */ var groupCoordinator: GroupCoordinator = _ /** 集群控制组件 */ var kafkaController: KafkaController = _ /** 定时任务调度器 */ val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) /** broker 节点活跃性检查 */ var kafkaHealthcheck: KafkaHealthcheck = _ /** broker 缓存整个集群中全部分区的状态信息 */ var metadataCache: MetadataCache = _ /** ZK 操作 工具 类 */ var zkUtils: ZkUtils = _ // ... 省略方法定义 }
在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown
方法实现,在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch
字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown
方法只是简单的调用了 CountDownLatch#await
方法来阻塞主线程。当 KafkaServer#shutdown
方法执行完成后,会调用 CountDownLatch#countDown
方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。
一. 服务启动过程分析
方法 KafkaServer#shutdown
的实现我们稍后进行分析,下面首先看一下 kafka 服务的启动过程,即 KafkaServer#startup
方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤进行进一步分析:
/brokers/ids/{brokerId}
下面针对上述流程中的 2、3、4 和 6 几个步骤进行进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。
首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:
sealed trait BrokerStates { def state: Byte } case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
关于每种状态的解释和状态转换图如下:
- NotRunning :初始状态,标识当前 broker 节点未运行。
- Starting :标识当前 broker 节点正在启动中。
- RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
- RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
- PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。
所谓 controlled shutdown,实际上是 kafka 提供的一种友好的关闭 broker 节点的机制,除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。
下面继续来分析一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:
trait Scheduler { def startup() def shutdown() def isStarted: Boolean def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) }
其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。
步骤 4调用了 KafkaServer#initZk
方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk
会基于 zookeeper.connect
配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:
/brokers/ids/{id} /brokers/topics/{topic}/partitions /brokers/topics/{topic}/partitions/{partition_id}/state /controller /controller_epoch /admin/reassign_partitions /admin/preferred_replica_election /admin/delete_topics /isr_change_notification /config
最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 kafka 服务之前,可以在配置中通过 broker.id
配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId
方法中,实现如下:
private def getBrokerId: Int = { // 获取配置的 brokerId var brokerId = config.brokerId val brokerIdSet = mutable.HashSet[Int]() // 遍历 log.dirs 配置的 log 目录列表 for (logDir <- config.logDirs) { // 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息 val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() brokerMetadataOpt.foreach { brokerMetadata => brokerIdSet.add(brokerMetadata.brokerId) } } if (brokerIdSet.size > 1) { // 不允许多个 broker 节点共享同一个 log 目录 // ... 抛出 InconsistentBrokerIdException 异常,略 } else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) { // 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致 // ... 抛出 InconsistentBrokerIdException 异常,略 } else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) { // 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性 brokerId = generateBrokerId } else if (brokerIdSet.size == 1) { // 从 meta.properties 中获取 brokerId brokerId = brokerIdSet.last } brokerId }
在 broker 节点的每个 log 目录下有一个 meta.properties
文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 kafka 可以通过该文件约束 broker.id
配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。
二. 服务关闭过程分析
Broker 节点在关闭对应的 kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown
中:
def shutdown() { try { info("shutting down") // 如果正在启动,则不允许关闭 if (isStartingUp.get) throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) // 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作 brokerState.newState(BrokerShuttingDown) /* 依次关闭相应注册的组件 */ if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) CoreUtils.swallow(kafkaScheduler.shutdown()) if (apis != null) CoreUtils.swallow(apis.close()) CoreUtils.swallow(authorizer.foreach(_.close())) if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown()) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown()) if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown()) if (logManager != null) CoreUtils.swallow(logManager.shutdown()) if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) // 设置 broker 状态为 NotRunning,表示关闭成功 brokerState.newState(NotRunning) // 设置状态标记 startupComplete.set(false) isShuttingDown.set(false) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString)) shutdownLatch.countDown() info("shut down completed") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer shutdown.", e) isShuttingDown.set(false) throw e } }
整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行分析。
三. 总结
本文我们主要分析了 kafka 服务启动和关闭的过程,Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性分析。
转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Elasticsearch源码分析 | 单节点的启动和关闭
- Redis源码解析:集群手动故障转移、从节点迁移详解
- 以太坊源码分析—p2p节点发现与协议运行
- Mybatis Mapper.xml 配置文件中 resultMap 节点的源码解析 原 荐
- 【Filecoin源码仓库全解析】第一章:搭建Filecoin测试节点 | 嘉乐的SOHO
- 兄弟连区块链教程eth源码分析node包建立多重协议eth节点
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
计算机科学概论(第11版)
J. Glenn Brookshear / 刘艺、肖成海、马小会、毛倩倩 / 人民邮电出版社 / 2011-10-1 / 69.00元
本书多年来一直深受世界各国高校师生的欢迎,是美国哈佛大学、麻省理工学院、普林斯顿大学、加州大学伯克利分校等许多著名大学的首选教材,对我国的高校教学也产生了广泛影响。 本 书以历史眼光,从发展的角度、当前的水平以及现阶段的研究方向等几个方面,全景式描绘了计算机科学各个子学科的主要领域。在内容编排上,本书很好地兼顾了 学科广度和主题深度,把握了最新的技术趋势。本书用算法、数据抽象等核心思想贯穿各......一起来看看 《计算机科学概论(第11版)》 这本书的介绍吧!