内容简介:本文依然是以kafka0.8.2.2为例讲解1
本文依然是以kafka0.8.2.2为例讲解
一,如何删除一个 topic
删除一个 topic 有两个关键点:
1 ,配置删除参数
delete.topic.enable 这个 Broker 参数配置为 True 。
2 ,执行
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
假如不配置删除参数为 true 的话, topic 其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除 topic 在 Zookeeper 的信息和日志,其实这个操作并不会清除 kafkaBroker 内存的 topic 数据。所以,此时最佳的策略是配置删除参数为 true 然后,重启 kafka 。
二,重要的类介绍
1 , PartitionStateMachine
该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态
NonExistentPartition
NewPartition
OnlinePartition
OfflinePartition
2 , ReplicaManager
负责管理当前机器的所有副本,处理读写、删除等具体动作。
读写:写获取 partition 对象,再获取 Replica 对象,再获取 Log 对象,采用其管理的 Segment 对象将数据写入、读出。
3 , ReplicaStateMachine
副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
NewReplica : Crontroller 在分区重分配的时候可以创建一个新的副本。只能接受变为 follower 的请求。前状态可以是 NonExistentReplica
OnlineReplica :新启动的分区,能接受变为 leader 或者 follower 请求。前状态可以是 NewReplica, OnlineReplica or OfflineReplica
OfflineReplica :死亡的副本处于这种状态。前状态可以是 NewReplica, OnlineReplica
ReplicaDeletionStarted :分本删除开始的时候处于这种状态,前状态是 OfflineReplica
ReplicaDeletionSuccessful :副本删除成功。前状态是 ReplicaDeletionStarted
ReplicaDeletionIneligible :删除失败的时候处于这种状态。前状态是 ReplicaDeletionStarted
NonExistentReplica :副本成功删除之后处于这种状态,前状态是 ReplicaDeletionSuccessful
4 , TopicDeletionManager
该类管理着 topic 删除的状态机
1) , TopicCommand 通过创建 /admin/delete_topics/<topic> ,来发布 topic 删除命令。
2) , Controller 监听 /admin/delete_topic 子节点变动,开始分别删除 topic
3) , Controller 有个后台线程负责删除 Topic
三,源码彻底解析 topic 的删除过程
此处会分四个部分:
A) , 客户端执行删除命令作用
B) , 不配置 delete.topic.enable 整个流水的源码
C) , 配置了 delete.topic.enable 整个流水的源码
D) , 手动删除 zk 上 topic 信息和磁盘数据
1 ,客户端执行删除命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
进入 kafka-topics.sh 我们会看到
exec $ (dirname $0) /kafka-run-class.sh kafka.admin.TopicCommand $@
进入 TopicCommand 里面, main 方法里面
else if (opts. options .has(opts. deleteOpt ))
deleteTopic (zkClient , opts)
实际内容是
val topics = getTopics (zkClient , opts)
if (topics.length == 0 ) {
println ( "Topic %s does not exist" .format(opts. options .valueOf(opts. topicOpt )))
}
topics.foreach { topic =>
try {
ZkUtils. createPersistentPath (zkClient , ZkUtils. getDeleteTopicPath (topic))
在 "/admin/delete_topics" 目录下创建了一个 topicName 的节点。
2 ,假如不配置 delete.topic.enable 整个流水是
总共有两处 listener 会响应:
A ),TopicChangeListener
B ),DeleteTopicsListener
使用 topic 的删除命令删除一个 topic 的话,指挥触发 DeleteTopicListener 。
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[ String ]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => ! controllerContext . allTopics .contains(t))
topicsToBeDeleted --= nonExistentTopics
if ( topicsToBeDeleted .size > 0 ) {
info( "Starting topic deletion for topics " + topicsToBeDeleted .mkString( "," ))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted .foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext . partitionsUndergoingPreferredReplicaElection .map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext . partitionsBeingReassigned .keySet.map(_.topic).contains(topic)
if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
}
由于都会判断 delete.topic.enable 是否为 true ,假如不为 true 就不会执行,为 true 就进入执行
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
3 , delete.topic.enable 配置为 true
此处与步骤 2 的区别,就是那两个处理函数。
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
markTopicIneligibleForDeletion 函数的处理为
if ( isDeleteTopicEnabled ) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
if (newTopicsToHaltDeletion.size > 0 )
info( "Halted deletion of topics %s" .format(newTopicsToHaltDeletion.mkString( "," )))
}
主要是停止删除 topic ,假如存储以下三种情况
* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic
enqueueTopicsForDeletion 主要作用是更新删除 topic 的集合,并激活 TopicDeleteThread
def enqueueTopicsForDeletion (topics: Set[ String ]) {
if ( isDeleteTopicEnabled ) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap( controllerContext .partitionsForTopic)
resumeTopicDeletionThread()
}
}
在删除线程 DeleteTopicsThread 的 doWork 方法中
topicsQueuedForDeletion .foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
if (controller. replicaStateMachine .areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info( "Deletion of topic %s successfully completed" .format(topic))
}
进入 completeDeleteTopic 方法中
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine .deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller. replicaStateMachine .replicasInState(topic , ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine .handleStateChanges(replicasForDeletedTopic , NonExistentReplica)
val partitionsForDeletedTopic = controllerContext .partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , OfflinePartition)
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted .retain(_.topic != topic)
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicPath (topic))
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicConfigPath (topic))
controllerContext .zkClient.delete(ZkUtils. getDeleteTopicPath (topic))
controllerContext .removeTopic(topic)
主要作用是解除掉监控分区变动的 listener ,删除 Zookeeper 具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。
其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。
首次清除的话,在删除线程 DeleteTopicsThread 的 doWork 方法中
{
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if (controller. replicaStateMachine .isAnyReplicaInState(topic , ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
进入 markTopicForDeletionRetry
val failedReplicas = controller.replicaStateMachine.replicasInState(topic , ReplicaDeletionIneligible)
info( "Retrying delete topic for topic %s since replicas %s were not successfully deleted"
.format(topic , failedReplicas.mkString( "," )))
controller. replicaStateMachine .handleStateChanges(failedReplicas , OfflineReplica)
在 ReplicaStateMachine 的 handleStateChanges 方法中,调用了 handleStateChange ,处理 OfflineReplica
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch .addStopReplicaRequestForBrokers( List (replicaId) , topic , partition , deletePartition = false )
接着在 handleStateChanges 中
brokerRequestBatch .sendRequestsToBrokers(controller.epoch , controllerContext . correlationId .getAndIncrement)
给副本数据存储节点发送 StopReplicaKey 副本指令,并开始删除数据
stopReplicaRequestMap foreach { case (broker , replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true ).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false ).map(i => i.replica).toSet
debug( "The stop replica request (delete = true) sent to broker %d is %s"
.format(broker , stopReplicaWithDelete.mkString( "," )))
debug( "The stop replica request (delete = false) sent to broker %d is %s"
.format(broker , stopReplicaWithoutDelete.mkString( "," )))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition ,
Set( TopicAndPartition (r.replica.topic , r.replica.partition)) , controllerId , controllerEpoch , correlationId)
controller.sendRequest(broker , stopReplicaRequest , r.callback)
}
}
stopReplicaRequestMap .clear()
Broker 的 KafkaApis 的 Handle 方法在接受到指令后
case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)
val (response , error) = replicaManager.stopReplicas(stopReplicaRequest)
接着是在 stopReplicas 方法中
{
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager .removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition (r.topic , r.partition)))
for (topicAndPartition <- stopReplicaRequest.partitions){
val errorCode = stopReplica(topicAndPartition.topic , topicAndPartition.partition , stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition , errorCode)
}
(responseMap , ErrorMapping. NoError )
}
进一步进入 stopReplica 方法,正式进入日志删除
getPartition(topic , partitionId) match {
case Some (partition) =>
if (deletePartition) {
val removedPartition = allPartitions .remove((topic , partitionId))
if (removedPartition != null )
removedPartition.delete() // this will delete the local log
}
以上就是 kafka 的整个日志删除流水。
4 ,手动删除 zk 上 topic 信息和磁盘数据
TopicChangeListener 会监听处理,但是处理很简单,只是更新了
val deletedTopics = controllerContext . allTopics -- currentChildren
controllerContext . allTopics = currentChildren
val addedPartitionReplicaAssignment = ZkUtils. getReplicaAssignmentForTopics ( zkClient , newTopics.toSeq)
controllerContext . partitionReplicaAssignment = controllerContext . partitionReplicaAssignment .filter(p =>
四,总结
K afka 的 topic 的删除过程,实际上就是基于 Zookeeper 做了一个订阅发布系统。 Zookeeper 的客户端创建一个节点 /admin/delete_topics/<topic> ,由 kafka Controller 监听到事件之后正式触发 topic 的删除:解除 Partition 变更监听的 listener ,清除内存数据结构,删除副本数据,删除 topic 的相关 Zookeeper 节点。
delete.topic.enable 配置该参数为 false 的情况下执行了 topic 的删除命令,实际上未做任何动作。我们此时要彻底删除 topic 建议修改该参数为 true ,重启 kafka ,这样 topic 信息会被彻底删除,已经测试。
一般流行的做法是手动删除 Zookeeper 的 topic 相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。
假如对kafka,hbase,spark源码感兴趣,欢迎大家关注浪尖公众号
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kubernetes源码分析之Pod的删除
- 如何提交takedown,删除泄漏源码的仓库和Fork
- GitHub被攻击 黑客删除数百个源码库索要比特币
- 微信实验十四、ThinkPHP5.0用户批量删除及源码下载
- 微信实验十二、ThinkPHP5.0单页浏览、增加、修改、删除用户及源码下载
- MySQL删除操作其实是假删除
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Purely Functional Data Structures
Chris Okasaki / Cambridge University Press / 1999-6-13 / USD 49.99
Most books on data structures assume an imperative language such as C or C++. However, data structures for these languages do not always translate well to functional languages such as Standard ML, Ha......一起来看看 《Purely Functional Data Structures》 这本书的介绍吧!