Kafka 源码解析:Broker 节点的启动与关闭

栏目: 后端 · 发布时间: 5年前

内容简介:从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。Kafka 提供了Kafka 驱动类的 main 方法实现如下:

从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。

Kafka 提供了 kafka-server-start.sh 脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka 类来启动 kafka 服务,这也是 kafka 整个服务端的驱动类。在 kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup 方法启动服务。

Kafka 驱动类的 main 方法实现如下:

def main(args: Array[String]): Unit = {
    try {
        // 解析命令行参数
        val serverProps = getPropsFromArgs(args)
        // 创建 kafkaServerStartable 对象,期间会初始化监控上报程序
        val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

        // 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法
        Runtime.getRuntime.addShutdownHook(new Thread() {
            override def run(): Unit = {
                kafkaServerStartable.shutdown()
            }
        })

        // 本质上调用的是 KafkaServer#startup 方法
        kafkaServerStartable.startup()
        // 阻塞等待 kafka server 运行线程关闭
        kafkaServerStartable.awaitShutdown()
    } catch {
        case e: Throwable =>
            fatal(e)
            System.exit(1)
    }
    System.exit(0)
}

KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:

class KafkaServer(val config: KafkaConfig, // 配置信息对象
                  time: Time = Time.SYSTEM, // 时间戳工具
                  threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序
                 ) extends Logging with KafkaMetricsGroup {

    /** 标识节点已经启动完成 */
    private val startupComplete = new AtomicBoolean(false)
    /** 标识节点正在执行关闭操作 */
    private val isShuttingDown = new AtomicBoolean(false)
    /** 标识节点正在执行启动操作 */
    private val isStartingUp = new AtomicBoolean(false)
    /** 阻塞主线程等待 KafkaServer 的关闭 */
    private var shutdownLatch = new CountDownLatch(1)
    /** 记录 broker 节点的当前状态 */
    val brokerState: BrokerState = new BrokerState
    /** Api 接口类,用于分发各种类型的请求 */
    var apis: KafkaApis = _
    /** 权限控制相关 */
    var authorizer: Option[Authorizer] = None
    var credentialProvider: CredentialProvider = _
    /** 网络 socket 服务 */
    var socketServer: SocketServer = _
    /** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */
    var requestHandlerPool: KafkaRequestHandlerPool = _
    /** 日志数据管理 */
    var logManager: LogManager = _
    /** 管理当前 broker 节点上的分区副本 */
    var replicaManager: ReplicaManager = _
    /** topic 增删管理 */
    var adminManager: AdminManager = _
    /** 动态配置管理 */
    var dynamicConfigHandlers: Map[String, ConfigHandler] = _
    var dynamicConfigManager: DynamicConfigManager = _
    /** group 协调管理组件 */
    var groupCoordinator: GroupCoordinator = _
    /** 集群控制组件 */
    var kafkaController: KafkaController = _
    /** 定时任务调度器 */
    val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
    /** broker 节点活跃性检查 */
    var kafkaHealthcheck: KafkaHealthcheck = _
    /** broker 缓存整个集群中全部分区的状态信息 */
    var metadataCache: MetadataCache = _
    /** ZK 操作 工具 类 */
    var zkUtils: ZkUtils = _

    // ... 省略方法定义

}

在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown 方法实现,在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch 字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown 方法只是简单的调用了 CountDownLatch#await 方法来阻塞主线程。当 KafkaServer#shutdown 方法执行完成后,会调用 CountDownLatch#countDown 方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。

一. 服务启动过程分析

方法 KafkaServer#shutdown 的实现我们稍后进行分析,下面首先看一下 kafka 服务的启动过程,即 KafkaServer#startup 方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤进行进一步分析:

/brokers/ids/{brokerId}

下面针对上述流程中的 2、3、4 和 6 几个步骤进行进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。

首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:

sealed trait BrokerStates { def state: Byte }

case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }

关于每种状态的解释和状态转换图如下:

  • NotRunning :初始状态,标识当前 broker 节点未运行。
  • Starting :标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。

Kafka 源码解析:Broker 节点的启动与关闭

所谓 controlled shutdown,实际上是 kafka 提供的一种友好的关闭 broker 节点的机制,除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。

下面继续来分析一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:

trait Scheduler {
    def startup()
    def shutdown()
    def isStarted: Boolean
    def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
}

其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。

步骤 4调用了 KafkaServer#initZk 方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk 会基于 zookeeper.connect 配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:

/brokers/ids/{id}
/brokers/topics/{topic}/partitions
/brokers/topics/{topic}/partitions/{partition_id}/state
/controller
/controller_epoch
/admin/reassign_partitions
/admin/preferred_replica_election
/admin/delete_topics
/isr_change_notification
/config

最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 kafka 服务之前,可以在配置中通过 broker.id 配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId 方法中,实现如下:

private def getBrokerId: Int = {
    // 获取配置的 brokerId
    var brokerId = config.brokerId
    val brokerIdSet = mutable.HashSet[Int]()

    // 遍历 log.dirs 配置的 log 目录列表
    for (logDir <- config.logDirs) {
        // 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息
        val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
        brokerMetadataOpt.foreach { brokerMetadata =>
            brokerIdSet.add(brokerMetadata.brokerId)
        }
    }

    if (brokerIdSet.size > 1) {
        // 不允许多个 broker 节点共享同一个 log 目录
        // ... 抛出 InconsistentBrokerIdException 异常,略
    } else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) {
        // 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致
        // ... 抛出 InconsistentBrokerIdException 异常,略
    } else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) {
        // 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性
        brokerId = generateBrokerId
    } else if (brokerIdSet.size == 1) {
        // 从 meta.properties 中获取 brokerId
        brokerId = brokerIdSet.last
    }

    brokerId
}

在 broker 节点的每个 log 目录下有一个 meta.properties 文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 kafka 可以通过该文件约束 broker.id 配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。

二. 服务关闭过程分析

Broker 节点在关闭对应的 kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown 中:

def shutdown() {
    try {
        info("shutting down")

        // 如果正在启动,则不允许关闭
        if (isStartingUp.get)
            throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")

        if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
            CoreUtils.swallow(controlledShutdown())
            // 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作
            brokerState.newState(BrokerShuttingDown)

            /* 依次关闭相应注册的组件 */

            if (socketServer != null) CoreUtils.swallow(socketServer.shutdown())
            if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown())
            CoreUtils.swallow(kafkaScheduler.shutdown())
            if (apis != null) CoreUtils.swallow(apis.close())
            CoreUtils.swallow(authorizer.foreach(_.close()))
            if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown())
            if (adminManager != null) CoreUtils.swallow(adminManager.shutdown())
            if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown())
            if (logManager != null) CoreUtils.swallow(logManager.shutdown())
            if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown())
            if (zkUtils != null) CoreUtils.swallow(zkUtils.close())
            if (metrics != null) CoreUtils.swallow(metrics.close())

            // 设置 broker 状态为 NotRunning,表示关闭成功
            brokerState.newState(NotRunning)

            // 设置状态标记
            startupComplete.set(false)
            isShuttingDown.set(false)
            CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
            shutdownLatch.countDown()
            info("shut down completed")
        }
    } catch {
        case e: Throwable =>
            fatal("Fatal error during KafkaServer shutdown.", e)
            isShuttingDown.set(false)
            throw e
    }
}

整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行分析。

三. 总结

本文我们主要分析了 kafka 服务启动和关闭的过程,Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性分析。

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算机科学概论(第11版)

计算机科学概论(第11版)

J. Glenn Brookshear / 刘艺、肖成海、马小会、毛倩倩 / 人民邮电出版社 / 2011-10-1 / 69.00元

本书多年来一直深受世界各国高校师生的欢迎,是美国哈佛大学、麻省理工学院、普林斯顿大学、加州大学伯克利分校等许多著名大学的首选教材,对我国的高校教学也产生了广泛影响。 本 书以历史眼光,从发展的角度、当前的水平以及现阶段的研究方向等几个方面,全景式描绘了计算机科学各个子学科的主要领域。在内容编排上,本书很好地兼顾了 学科广度和主题深度,把握了最新的技术趋势。本书用算法、数据抽象等核心思想贯穿各......一起来看看 《计算机科学概论(第11版)》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

SHA 加密
SHA 加密

SHA 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具