Kafka 源码系列之如何删除 topic

栏目: 服务器 · Apache · 发布时间: 6年前

内容简介:本文依然是以kafka0.8.2.2为例讲解1

本文依然是以kafka0.8.2.2为例讲解

一,如何删除一个 topic

删除一个 topic 有两个关键点:

1 ,配置删除参数

delete.topic.enable 这个 Broker 参数配置为 True

2 ,执行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置删除参数为 true 的话, topic 其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除 topic Zookeeper 的信息和日志,其实这个操作并不会清除 kafkaBroker 内存的 topic 数据。所以,此时最佳的策略是配置删除参数为 true 然后,重启 kafka

二,重要的类介绍

1 PartitionStateMachine

该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

NonExistentPartition

NewPartition

OnlinePartition

OfflinePartition

2 ReplicaManager

负责管理当前机器的所有副本,处理读写、删除等具体动作。

读写:写获取 partition 对象,再获取 Replica 对象,再获取 Log 对象,采用其管理的 Segment 对象将数据写入、读出。

3 ReplicaStateMachine

副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种

NewReplica Crontroller 在分区重分配的时候可以创建一个新的副本。只能接受变为 follower 的请求。前状态可以是 NonExistentReplica

OnlineReplica :新启动的分区,能接受变为 leader 或者 follower 请求。前状态可以是 NewReplica, OnlineReplica or OfflineReplica

OfflineReplica :死亡的副本处于这种状态。前状态可以是 NewReplica, OnlineReplica

ReplicaDeletionStarted :分本删除开始的时候处于这种状态,前状态是 OfflineReplica

ReplicaDeletionSuccessful :副本删除成功。前状态是 ReplicaDeletionStarted

ReplicaDeletionIneligible :删除失败的时候处于这种状态。前状态是 ReplicaDeletionStarted

NonExistentReplica :副本成功删除之后处于这种状态,前状态是 ReplicaDeletionSuccessful

4 TopicDeletionManager

该类管理着 topic 删除的状态机

1) TopicCommand 通过创建 /admin/delete_topics/<topic> ,来发布 topic 删除命令。

2) Controller 监听 /admin/delete_topic 子节点变动,开始分别删除 topic

3) Controller 有个后台线程负责删除 Topic

三,源码彻底解析 topic 的删除过程

此处会分四个部分:

A) , 客户端执行删除命令作用

B) , 不配置 delete.topic.enable 整个流水的源码

C) , 配置了 delete.topic.enable 整个流水的源码

D) , 手动删除 zk topic 信息和磁盘数据

1 ,客户端执行删除命令

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

进入 kafka-topics.sh 我们会看到

exec $ (dirname $0) /kafka-run-class.sh kafka.admin.TopicCommand $@

进入 TopicCommand 里面, main 方法里面

else if (opts. options .has(opts. deleteOpt ))
   deleteTopic (zkClient , opts)

实际内容是

val topics = getTopics (zkClient , opts)
if (topics.length == 0 ) {
   println ( "Topic %s does not exist" .format(opts. options .valueOf(opts. topicOpt )))
}
topics.foreach { topic =>
   try {
    ZkUtils. createPersistentPath (zkClient , ZkUtils. getDeleteTopicPath (topic))

"/admin/delete_topics" 目录下创建了一个 topicName 的节点。

2 ,假如不配置 delete.topic.enable 整个流水是

总共有两处 listener 会响应:

A ),TopicChangeListener

B ),DeleteTopicsListener

使用 topic 的删除命令删除一个 topic 的话,指挥触发 DeleteTopicListener

var topicsToBeDeleted = {
   import JavaConversions._
  (children: Buffer[ String ]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => ! controllerContext . allTopics .contains(t))

topicsToBeDeleted  --= nonExistentTopics
if ( topicsToBeDeleted .size > 0 ) {
  info( "Starting topic deletion for topics " + topicsToBeDeleted .mkString( "," ))
   // mark topic ineligible for deletion if other state changes are in progress
   topicsToBeDeleted .foreach { topic =>
     val preferredReplicaElectionInProgress =
       controllerContext . partitionsUndergoingPreferredReplicaElection .map(_.topic).contains(topic)
     val partitionReassignmentInProgress =
       controllerContext . partitionsBeingReassigned .keySet.map(_.topic).contains(topic)
     if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
      controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
  }
   // add topic to deletion list  
   controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
}

由于都会判断 delete.topic.enable 是否为 true ,假如不为 true 就不会执行,为 true 就进入执行

controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))

controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )

3 delete.topic.enable 配置为 true

此处与步骤 2 的区别,就是那两个处理函数。

controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))

controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )

markTopicIneligibleForDeletion 函数的处理为

if ( isDeleteTopicEnabled ) {
   val newTopicsToHaltDeletion = topicsToBeDeleted & topics
   topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
   if (newTopicsToHaltDeletion.size > 0 )
    info( "Halted deletion of topics %s" .format(newTopicsToHaltDeletion.mkString( "," )))
}

主要是停止删除 topic ,假如存储以下三种情况

* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion 主要作用是更新删除 topic 的集合,并激活 TopicDeleteThread

def enqueueTopicsForDeletion (topics: Set[ String ]) {
   if ( isDeleteTopicEnabled ) {
     topicsToBeDeleted ++= topics
     partitionsToBeDeleted ++= topics.flatMap( controllerContext .partitionsForTopic)
    resumeTopicDeletionThread()
  }
}

在删除线程 DeleteTopicsThread doWork 方法中

topicsQueuedForDeletion .foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
   if (controller. replicaStateMachine .areAllReplicasForTopicDeleted(topic)) {
     // clear up all state for this topic from controller cache and zookeeper
     completeDeleteTopic(topic)
    info( "Deletion of topic %s successfully completed" .format(topic))
  }

进入 completeDeleteTopic 方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine .deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller. replicaStateMachine .replicasInState(topic , ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine .handleStateChanges(replicasForDeletedTopic , NonExistentReplica)
val partitionsForDeletedTopic = controllerContext .partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , OfflinePartition)
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted .retain(_.topic != topic)
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicPath (topic))
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicConfigPath (topic))
controllerContext .zkClient.delete(ZkUtils. getDeleteTopicPath (topic))
controllerContext .removeTopic(topic)

主要作用是解除掉监控分区变动的 listener ,删除 Zookeeper 具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

首次清除的话,在删除线程 DeleteTopicsThread doWork 方法中

{
   // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
  // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
  // or there is at least one failed replica (which means topic deletion should be retried).
   if (controller. replicaStateMachine .isAnyReplicaInState(topic , ReplicaDeletionIneligible)) {
     // mark topic for deletion retry
     markTopicForDeletionRetry(topic)
  }

进入 markTopicForDeletionRetry

val   failedReplicas = controller.replicaStateMachine.replicasInState(topic , ReplicaDeletionIneligible)
info( "Retrying delete topic for topic %s since replicas %s were not successfully deleted"
   .format(topic , failedReplicas.mkString( "," )))
controller. replicaStateMachine .handleStateChanges(failedReplicas , OfflineReplica)

ReplicaStateMachine handleStateChanges 方法中,调用了 handleStateChange ,处理 OfflineReplica

// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch .addStopReplicaRequestForBrokers( List (replicaId) , topic , partition , deletePartition = false )

接着在 handleStateChanges

brokerRequestBatch .sendRequestsToBrokers(controller.epoch , controllerContext . correlationId .getAndIncrement)

给副本数据存储节点发送 StopReplicaKey 副本指令,并开始删除数据

stopReplicaRequestMap  foreach { case (broker , replicaInfoList) =>
   val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true ).map(i => i.replica).toSet
   val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false ).map(i => i.replica).toSet
  debug( "The stop replica request (delete = true) sent to broker %d is %s"
     .format(broker , stopReplicaWithDelete.mkString( "," )))
  debug( "The stop replica request (delete = false) sent to broker %d is %s"
     .format(broker , stopReplicaWithoutDelete.mkString( "," )))
  replicaInfoList.foreach { r =>
     val stopReplicaRequest = new StopReplicaRequest(r.deletePartition ,
       Set( TopicAndPartition (r.replica.topic , r.replica.partition)) , controllerId , controllerEpoch , correlationId)
    controller.sendRequest(broker , stopReplicaRequest , r.callback)
  }
}
stopReplicaRequestMap .clear()

Broker KafkaApis Handle 方法在接受到指令后

case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)

val (response , error) = replicaManager.stopReplicas(stopReplicaRequest)

接着是在 stopReplicas 方法中

{
   controllerEpoch = stopReplicaRequest.controllerEpoch
   // First stop fetchers for all partitions, then stop the corresponding replicas
   replicaFetcherManager .removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition (r.topic , r.partition)))
   for (topicAndPartition <- stopReplicaRequest.partitions){
     val errorCode = stopReplica(topicAndPartition.topic , topicAndPartition.partition , stopReplicaRequest.deletePartitions)
    responseMap.put(topicAndPartition , errorCode)
  }
  (responseMap , ErrorMapping. NoError )
}

进一步进入 stopReplica 方法,正式进入日志删除

getPartition(topic , partitionId) match {
   case Some (partition) =>
     if (deletePartition) {
       val removedPartition = allPartitions .remove((topic , partitionId))
       if (removedPartition != null )
        removedPartition.delete() // this will delete the local log
     }

以上就是 kafka 的整个日志删除流水。

4 ,手动删除 zk topic 信息和磁盘数据

TopicChangeListener 会监听处理,但是处理很简单,只是更新了

val deletedTopics = controllerContext . allTopics -- currentChildren
controllerContext . allTopics = currentChildren

val addedPartitionReplicaAssignment = ZkUtils. getReplicaAssignmentForTopics ( zkClient , newTopics.toSeq)
controllerContext . partitionReplicaAssignment = controllerContext . partitionReplicaAssignment .filter(p =>

四,总结

K afka topic 的删除过程,实际上就是基于 Zookeeper 做了一个订阅发布系统。 Zookeeper 的客户端创建一个节点 /admin/delete_topics/<topic> ,由 kafka Controller 监听到事件之后正式触发 topic 的删除:解除 Partition 变更监听的 listener ,清除内存数据结构,删除副本数据,删除 topic 的相关 Zookeeper 节点。

delete.topic.enable 配置该参数为 false 的情况下执行了 topic 的删除命令,实际上未做任何动作。我们此时要彻底删除 topic 建议修改该参数为 true ,重启 kafka ,这样 topic 信息会被彻底删除,已经测试。

一般流行的做法是手动删除 Zookeeper topic 相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

假如对kafka,hbase,spark源码感兴趣,欢迎大家关注浪尖公众号

Kafka 源码系列之如何删除 topic


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Purely Functional Data Structures

Purely Functional Data Structures

Chris Okasaki / Cambridge University Press / 1999-6-13 / USD 49.99

Most books on data structures assume an imperative language such as C or C++. However, data structures for these languages do not always translate well to functional languages such as Standard ML, Ha......一起来看看 《Purely Functional Data Structures》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具