内容简介:从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。Kafka 提供了Kafka 驱动类的 main 方法实现如下:
从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。
Kafka 提供了 kafka-server-start.sh 脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka 类来启动 kafka 服务,这也是 kafka 整个服务端的驱动类。在 kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup 方法启动服务。
Kafka 驱动类的 main 方法实现如下:
def main(args: Array[String]): Unit = {
try {
// 解析命令行参数
val serverProps = getPropsFromArgs(args)
// 创建 kafkaServerStartable 对象,期间会初始化监控上报程序
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
kafkaServerStartable.shutdown()
}
})
// 本质上调用的是 KafkaServer#startup 方法
kafkaServerStartable.startup()
// 阻塞等待 kafka server 运行线程关闭
kafkaServerStartable.awaitShutdown()
} catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:
class KafkaServer(val config: KafkaConfig, // 配置信息对象
time: Time = Time.SYSTEM, // 时间戳工具
threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序
) extends Logging with KafkaMetricsGroup {
/** 标识节点已经启动完成 */
private val startupComplete = new AtomicBoolean(false)
/** 标识节点正在执行关闭操作 */
private val isShuttingDown = new AtomicBoolean(false)
/** 标识节点正在执行启动操作 */
private val isStartingUp = new AtomicBoolean(false)
/** 阻塞主线程等待 KafkaServer 的关闭 */
private var shutdownLatch = new CountDownLatch(1)
/** 记录 broker 节点的当前状态 */
val brokerState: BrokerState = new BrokerState
/** Api 接口类,用于分发各种类型的请求 */
var apis: KafkaApis = _
/** 权限控制相关 */
var authorizer: Option[Authorizer] = None
var credentialProvider: CredentialProvider = _
/** 网络 socket 服务 */
var socketServer: SocketServer = _
/** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */
var requestHandlerPool: KafkaRequestHandlerPool = _
/** 日志数据管理 */
var logManager: LogManager = _
/** 管理当前 broker 节点上的分区副本 */
var replicaManager: ReplicaManager = _
/** topic 增删管理 */
var adminManager: AdminManager = _
/** 动态配置管理 */
var dynamicConfigHandlers: Map[String, ConfigHandler] = _
var dynamicConfigManager: DynamicConfigManager = _
/** group 协调管理组件 */
var groupCoordinator: GroupCoordinator = _
/** 集群控制组件 */
var kafkaController: KafkaController = _
/** 定时任务调度器 */
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
/** broker 节点活跃性检查 */
var kafkaHealthcheck: KafkaHealthcheck = _
/** broker 缓存整个集群中全部分区的状态信息 */
var metadataCache: MetadataCache = _
/** ZK 操作 工具 类 */
var zkUtils: ZkUtils = _
// ... 省略方法定义
}
在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown 方法实现,在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch 字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown 方法只是简单的调用了 CountDownLatch#await 方法来阻塞主线程。当 KafkaServer#shutdown 方法执行完成后,会调用 CountDownLatch#countDown 方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。
一. 服务启动过程分析
方法 KafkaServer#shutdown 的实现我们稍后进行分析,下面首先看一下 kafka 服务的启动过程,即 KafkaServer#startup 方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤进行进一步分析:
/brokers/ids/{brokerId}
下面针对上述流程中的 2、3、4 和 6 几个步骤进行进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。
首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
关于每种状态的解释和状态转换图如下:
- NotRunning :初始状态,标识当前 broker 节点未运行。
- Starting :标识当前 broker 节点正在启动中。
- RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
- RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
- PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。
所谓 controlled shutdown,实际上是 kafka 提供的一种友好的关闭 broker 节点的机制,除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。
下面继续来分析一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:
trait Scheduler {
def startup()
def shutdown()
def isStarted: Boolean
def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
}
其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。
步骤 4调用了 KafkaServer#initZk 方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk 会基于 zookeeper.connect 配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:
/brokers/ids/{id}
/brokers/topics/{topic}/partitions
/brokers/topics/{topic}/partitions/{partition_id}/state
/controller
/controller_epoch
/admin/reassign_partitions
/admin/preferred_replica_election
/admin/delete_topics
/isr_change_notification
/config
最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 kafka 服务之前,可以在配置中通过 broker.id 配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId 方法中,实现如下:
private def getBrokerId: Int = {
// 获取配置的 brokerId
var brokerId = config.brokerId
val brokerIdSet = mutable.HashSet[Int]()
// 遍历 log.dirs 配置的 log 目录列表
for (logDir <- config.logDirs) {
// 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
brokerMetadataOpt.foreach { brokerMetadata =>
brokerIdSet.add(brokerMetadata.brokerId)
}
}
if (brokerIdSet.size > 1) {
// 不允许多个 broker 节点共享同一个 log 目录
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) {
// 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) {
// 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性
brokerId = generateBrokerId
} else if (brokerIdSet.size == 1) {
// 从 meta.properties 中获取 brokerId
brokerId = brokerIdSet.last
}
brokerId
}
在 broker 节点的每个 log 目录下有一个 meta.properties 文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 kafka 可以通过该文件约束 broker.id 配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。
二. 服务关闭过程分析
Broker 节点在关闭对应的 kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown 中:
def shutdown() {
try {
info("shutting down")
// 如果正在启动,则不允许关闭
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown())
// 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作
brokerState.newState(BrokerShuttingDown)
/* 依次关闭相应注册的组件 */
if (socketServer != null) CoreUtils.swallow(socketServer.shutdown())
if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
if (apis != null) CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null) CoreUtils.swallow(adminManager.shutdown())
if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown())
if (logManager != null) CoreUtils.swallow(logManager.shutdown())
if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null) CoreUtils.swallow(zkUtils.close())
if (metrics != null) CoreUtils.swallow(metrics.close())
// 设置 broker 状态为 NotRunning,表示关闭成功
brokerState.newState(NotRunning)
// 设置状态标记
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
shutdownLatch.countDown()
info("shut down completed")
}
} catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}
整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行分析。
三. 总结
本文我们主要分析了 kafka 服务启动和关闭的过程,Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性分析。
转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Elasticsearch源码分析 | 单节点的启动和关闭
- Redis源码解析:集群手动故障转移、从节点迁移详解
- 以太坊源码分析—p2p节点发现与协议运行
- Mybatis Mapper.xml 配置文件中 resultMap 节点的源码解析 原 荐
- 【Filecoin源码仓库全解析】第一章:搭建Filecoin测试节点 | 嘉乐的SOHO
- 兄弟连区块链教程eth源码分析node包建立多重协议eth节点
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。