1、什么是 Kafka
Kafka 是一个分布式的、多分区、多副本、多生产者、多消费者的基于发布/订阅模式的消息队列。目前 Kafka 已经定位为一个分布式流式处理平台,它可以实现对消息顺序的持久化,支持消息回溯和高性能读写,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
2、Kafka 架构
Kafka 总体架构图中包含多个概念:
(1)ZooKeeper : Zookeeper 负责保存 broker 集群元数据,并对控制器进行选举等操作。
(2)Producer : 消息生产者,就是向 kafka broker 发消息的客户端。
(3)Broker : 一个独立的 Kafka 服务器被称作 broker ,一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic 。broker 负责接收来自生产者的消息,为消息设置偏移量,并将消息存储在磁盘。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
(4)Consumer : 消息消费者,向 kafka broker 取消息的客户端。
(5)Consumer Group :消费者组,一个消费者组可以包含一个或多个 Consumer。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费者组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消费消息时互不影响。Kafka 就是通过消费者组的方式来实现消息 P2P 模式和广播模式。
(6)Topic : Kafka 中的消息以 Topic 为单位进行划分,可以理解为一个队列。生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
(7)Partition : 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(服务器)上,一个 topic 可以分为多个 partition ,每个 partition 是一个有序的队列。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被
追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
(8)Offset : 分区中每条消息都会分配一个有序的 id ,即偏移量。offset 不跨越分区,也就是说 Kafka 保证的是分区有序性而不是主题有序性。
(9)Replica : 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower 。通常只有 leader 副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 leader 副本对外提供读写服务。
(10)Record : 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key 、value 和 timestamp 。
(11)Leader : 每个分区多个副本的 "主" 副本,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
(12)Follower : 每个分区多个副本中的"从" 副本,实时从 Leader 中同步数据,保持和 leader 数据的同步。Leader 发生故障时,某个 follow 会成为新的 leader。
(13)ISR (In-Sync Replicas) :副本同步队列,表示和 leader 保持同步的副本的集合(包括 leader 本身)。如果 follower 长时间不与 leader 同步数据则将该副本踢出ISR队列。leader 发生故障会从 ISR 中选举新 leader 。
(14)OSR (Out-of-Sync Replicas) :因同步延迟过高而被踢出 ISR 的副本存在 OSR 。
(15)AR (Assigned Replicas) :所有副本集合,即 AR = ISR + OSR 。
3、发布订阅的消息系统那么多,为啥选择 Kafka?(Kafka的特点)
(1)多个生产者
Kafka 可以无缝地支持多个生产者,不管客户端使用一个主题,还是多个主题。Kafka 适合从多个前端系统收集数据,并以统一的格式堆外提供数据。
(2)多个消费者
Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次。
(3)基于磁盘的数据存储(持久性,可靠性)
Kafka 允许消费者非实时地读取消息,原因在于 Kafka 将消息提交到磁盘上,设置了保留规则进行保存,无需担心消息丢失等问题。
(4)伸缩性,可扩展性
可扩展多台 broker 。用户可以先使用单个 broker ,到后面可以扩展到多个 broker 。
(5)高性能(高吞吐,低延迟)
Kafka 可以轻松处理百万千万级消息流,同时还能保证亚秒级的消息延迟。以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。Kafka 顺序写磁盘,因此效率非常高,经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证。
对比如图:
4、Kafka 如何做到高吞吐量/高性能的?
Kafka 实现高吞吐量和性能,主要通过以下几点:
1、页缓存技术
Kafka 是基于 操作系统 的页缓存来实现文件写入的。操作系统本身有一层缓存,叫做 page cache,是在 内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。Kafka 在写入磁盘文件的时候,可以直接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。通过这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘。
2、磁盘顺序写
另一个主要功能是 Kafka 写数据的时候,是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到log文件的末尾,不是在文件的随机位置来修改数据。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
基于上面两点,Kafka 就实现了写入数据的超高性能。
3、零拷贝
大家应该都知道,从 Kafka 里经常要消费数据,那么消费的时候实际上就是要从 Kafka 的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示:
那么这里如果频繁的从磁盘读数据然后发给消费者,会增加两次没必要的拷贝,如下图:
一次是从操作系统的 cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存,如下图所示:
通过 零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。Kafka 从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。Kafka 集群经过良好的调优,数据直接写入 os cache 中,然后读数据的时候也是从os cache 中读。相当于 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。
5、 Kafka 和 Zookeeper 之间的关系
Kafka 使用 Zookeeper 来保存集群的元数据信息和消费者信息(偏移量),没有 Zookeeper,Kafka 是工作不起来。 在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的点,节点路径为/brokers/ids。
每个 Broker 服务器在启动时,都会到 Zookeeper 上进行注册,即创建/brokers/ids/[0-N] 的节点,然后写入 IP,端口等信息,Broker 创建的是临时节点,所以一旦 Broker 上线或者下线,对应 Broker 节点也就被删除了,因此可以通过 zookeeper 上 Broker 节点的变化来动态表征 Broker 服务器的可用性。
6、生产者向 Kafka 发送消息的执行流程
如下图所示:
(1)生产者要往 Kafka 发送消息时,需要创建 ProducerRecoder,代码如下:
ProducerRecord<String,String> record
= new ProducerRecoder<>("CostomerCountry","Precision Products","France");
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace();
}
(2)ProducerRecoder 对象会包含目标 topic,分区内容,以及指定的 key 和 value,在发送 ProducerRecoder 时,生产者会先把键和值对象序列化成字节数组,然后在网络上传输。
(3)生产者在将消息发送到某个 Topic ,需要经过拦截器、序列化器和分区器(Partitioner)。
(4)如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
若没有指定分区,且消息的 key 不为空,则使用 murmur 的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。
若没有指定分区,且消息的 key 也是空,则用轮询的方式选择一个分区。
(5)分区选择好之后,会将消息添加到一个记录批次中,这个批次的所有消息都会被发送到相同的 Topic 和 partition 上。然后会有一个独立的线程负责把这些记录批次发送到相应的 broker 中。
(6)leader 接收到 Msg 后,将消息写入本地 log。如果成功写入 Kafka 中,就返回一个 RecordMetaData 对象,它包含 Topic 和 Partition 信息,以及记录在分区的 offset。
(7)若写入失败,就返回一个错误异常,生产者在收到错误之后尝试重新发送消息,几次之后如果还失败,就返回错误信息。
(8)Followers 从 leader 拉取消息,写入本地 log 后向 leader 发送 ACK。leader 收到所有 ISR 中的 replica 的 ACK 后,增加高水位,并向 producer 发送 ACK。
7、Kafka 如何保证对应类型的消息被写到相同的分区?
通过 消息键 和 分区器 来实现,分区器为键生成一个 offset,然后使用 offset 对主题分区进行取模,为消息选取分区,这样就可以保证包含同一个键的消息会被写到同一个分区上。
如果 ProducerRecord 没有指定分区,且消息的 key 不为空,则使用 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。
如果 ProducerRecord 没有指定分区,且消息的 key 也是空,则用 轮询 的方式选择一个分区。
8、Kafka 文件存储机制
在 Kafka 中,一个 Topic 会被分割成多个 Partition,而 Partition 由多个更小的 Segment 的元素组成。Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 文件夹下面会有多组 segment(逻辑分组,并不是真实存在),每个 segment 对应三个文件:.log文件、.index文件、.timeindex 文件。topic 是逻辑上的概念,而 partition是物理上的概念,每个 partition 对应于多个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
Kafka 会根据 log.segment.bytes 的配置来决定单个 Segment 文件(log)的大小,当写入数据达到这个大小时就会创建新的 Segment。
9、如何根据 offset 找到对应的 Message?
每个索引项占用 8 个字节,分为两个部分:
(1) relativeOffset : 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用4个字节(relativeOffset = offset - baseOffset),当前索引文件的文件名即为 baseOffset 的值。
例如:一个日志片段的 baseOffset 为 32,那么其文件名就是 00000000000000000032.log,offset=35 的消息在索引文件中的 relativeOffset 的值为 35-32=3
(2) position: 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
(1)先找到 offset=3 的 message 所在的 segment 文件(利用二分法查找),先判断.index 文件名称offset(baseOffset)是否小于3;
若小于,则继续二分与下一个.inde 文件名称offset比较;
若大于,则返回上次小于3的.index 文件,这里找到的就是在第一个 segment 文件。
(2)找到的 segment 中的.index 文件,用查找的 offset 减去.index 文件名的 offset(relativeOffset = offset - baseOffset),也就是00000.index 文件,我们要查找的 offset 为3的 message 在该 .index 文件内的索引为3(index采用稀疏存储的方式,它不会为每一条 message 都建立索引,而是每隔4k左右,建立一条索引,避免索引文件占用过多的空间。缺点是没有建立索引的 offset 不能一次定位到 message 的位置,需要做一次顺序扫描,但是扫描的范围很小)。
(3)根据找到的 relative offset 为3的索引,确定 message 存储的物理偏移地址为756。
(4)根据物理偏移地址,去.log 文件找相应的 Message
同理,我如果想找offset=8 对应的 Message 数据呢?
(1)首先根据二分查找法找到segment的对应的00000000000000000006.index索引文件
(2)根据 offset=8 找到对应的索引文件中的位置,该位置保存了一个偏移量326,根据偏移量326在00000000000000000006.log文件中找到对应的消息 Message-8。
Kafka 的 Message 存储采用了分区,磁盘顺序读写,分段和稀疏索引等一些手段来达到高效性,在0.9版本之后,offset 已经直接维护在 Kafka 集群的 __consumer_offsets 这个 topic 中。
10、 Producer 发送的一条 message 中包含哪些信息?
消息由可变长度的报头、可变长度的不透明密钥字节数组和可变长度的不透明值字节数组组成。
RecordBatch 是 Kafka 数据的存储单元,一个 RecordBatch 中包含多个 Record(即我们通常说的一条消息)。RecordBatch 中各个字段的含义如下:
一个 RecordBatch 中可以包含多条消息,即上图中的 Record ,而每条消息又可以包含多个 Header 信息,Header 是 Key-Value 形式的。
11、Kafka 如何实现消息有序
一、生产者端
Kafka 的发送端发送消息,如果是默认参数什么都不设置,则消息如果在网络没有抖动的时候,可以一批批的按消息发送的顺序被发送到 Kafka 服务器端。但是,一旦网络波动了,则消息就可能出现失序。
所以,要严格保证 Kafka 发消息有序,首先要考虑同步发送消息。
同步发送消息有两种方式:
1、设置消息响应参数 acks > 0,最好是 -1。
然后,设置
max.in.flight.requests.per.connection = 1
这样设置完后,在 Kafka 的发送端,将会一条消息发出后,响应必须满足 acks 设置的参数后,才会发送下一条消息。所以,虽然在使用时,还是异步发送的方式,其实底层已经是一条接一条的发送了。
2、当调用 KafkaProducer 的 send 方法后,调用 send 方法返回的 Future 对象的 get 方式阻塞等待结果。等结果返回后,再继续调用 KafkaProducer 的 send 方法发送下一条消息。
同步发送消息之外,还要考虑消息重发问题。
Kafka 发送端可以在发送出现问题时,判断问题是否可以自动恢复,如果是可以自动恢复的问题,可以通过设置 retries > 0,让 Kafka 自动重试。
根据 Kafka 版本的不同,Kafka 1.0 之后的版本,发送端引入了幂等特性。引入幂等特性,我们可以这么设置
enable.idempotence = true
幂等特性这个特性可以给消息添加序列号,每次发送,会把序列号递增 1。
开启了 Kafka 发送端的幂等特性后,我们就可以设置
max.in.flight.requests.per.connection = 5
这样,当 Kafka 发消息的时候,由于消息有了序列号,当发送消息出现错误的时候,在 Kafka 底层会通过获取服务器端的最近几条日志的序列号和发送端需要重新发送的消息序列号做对比,如果是连续的,那么就可以继续发送消息,保证消息顺序。
二、Broker 端
Kafka 的 Topic 只是一个逻辑概念。而组成 Topic 的分区才是真正存消息的地方。
Kafka 只保证同个分区内的消息是有序的。所以,如果要保证业务全局严格有序,就要设置 Topic 为单分区的形式。
不过,往往我们的业务是不需要考虑全局有序的,我们只需要保证业务中不同类别的消息有序即可。对这些业务中不同类别的消息,可以设置成不同的 Key,然后根据 Key 取模。这样,由于同类别消息有同样的 Key,就会被分配到同样的分区中,保证有序。
但是,这里有个问题,就是当我们对分区的数量进行改变的时候,会把以前可能分到同样的分区的消息,分到别的分区上。这就不能保证消息顺序了。
面对这种情况,就需要在动态变更分区的时候,考虑对业务的影响。有可能需要根据业务和当前分区需求,重新划分消息类别。
另外,如果一个 Topic 存在多分区的情况,并且 min.insync.replicas 指定的副本个数挂掉了,那么,就会出现这种情况:发送消息写入不了对应分区,但是消费依然可以消费消息。
此时,往往我们会保证可用性,会考虑切换消息的分区,一旦这样做,消息顺序就可能出现不一致的情况。
所以,一定要保证 min.insync.replicas 参数配置的合适,去最大可能保证消息写入的顺序性。
三、消费者端
在消费者端,根据 Kafka 的模型,一个 Topic 下的每个分区只能从属于监听这个 Topic 的消费者组中的某一个消费者,保证分区内消费有序。
假设 Topic 的分区数量为 P,而消费者组中的消费者数为 C。那么,如果 P < C , 就会出现消费者空闲的情况;如果 P > C,则会出现一个消费者被分配多个分区的情况。
所以,当我们消费者端使用 poll 方法的时候,一定要注意:poll 方法获取到的记录,很可能是多个分区甚至多个 Topic 的。
还需要通过 ConsumerRecords 的 records(TopicPartition partition) 进行进一步的 排序 和筛选,才能真正的保证发送和消费的顺序一致性使用。
另外一个要注意的地方就是消费者的 Rebalance。Rebalance 就是让一个消费者组下所有的消费者实例,就如何消费订阅主题的所有分区达成共识的过程。
Rebalance 机制是 Kafka 最臭名照顾的地方:
- 它每次 Rebalance,都会让全部消费者组的消费暂停。
- 再就是 Rebalance 的 bug 非常多,比如就是 Rebalance 后,要么某个消费者突然崩了,要么是消费者组中某些消费者停了。
- 由于 Rebalance 相当于让消费者组重新分配分区,这就可能造成消费者在 Rebalance 前、后所对应的分区不一致。分区不一致,那自然消费顺序就不可能一致了。
所以,我们都会尽量不让 Rebalance 发生。
有三种情况会触发 Kafka 消费者的 Rebalance 发生:
1、消费者组成员发生变化:这个往往是指,我们认为增减了组内的消费者个数,又或者是某些消费者崩溃了,导致被踢出组。
2、订阅主题数发生变化:Kafka 的消费者组是能用正则去模糊匹配 Topic 的。这就造成一个问题,当我们在 Kafka 中添加主题后,可能会造成消费者组监听的 Topic 数发生变化。
3、订阅主题的分区数发生变化:有些时候,可能我们想动态的线上变更主题的分区数。
所以,当这三种情况触发 Rebalance 后,就会出现问题,消费顺序不一致只是其中很轻微的一种负面影响。
整个 Kafka 不保证有序。如果为了保证 Kafka 全局有序,那么设置一个生产者,一个分区,一个消费者。
12、Kafka 有哪些分区算法?
Kafka 包含三种分区算法:
(1)轮询策略
也称 Round-robin 策略,即顺序分配。比如一个 topic 下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第四条消息时又会重新开始。
轮询策略是 Kafka java 生产者 API 默认提供的分区策略。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是平时最常用的分区策略之一。
(2)随机策略
也称 Randomness 策略。所谓随机就是我们随意地将消息放置在任意一个分区上,如下图:
(3)按 key 分配策略
kafka 允许为每条消息定义消息键,简称为 key 。一旦消息被定义了 key ,那么你就可以保证同一个 key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,如下图所示:
13、Kafka 的默认消息保留策略
broker 默认的消息保留策略分为两种:
日志片段通过 log.segment.bytes 配置(默认是1GB)
日志片段通过 log.segment.ms 配置 (默认7天)
14、Kafka 如何实现单个集群间的消息复制?
Kafka 消息负责机制只能在单个集群中进行复制,不能在多个集群之间进行。
Kafka 提供了一个叫做 MirrorMaker 的核心组件,该组件包含一个生产者和一个消费者,两者之间通过一个队列进行相连,当消费者从一个集群读取消息,生产者把消息发送到另一个集群。
15、Kafka 消息确认(ack 应答)机制
为保证 producer 发送的数据,能可靠的达到指定的 topic , Producer 提供了消息确认机制。生产者往 Broker 的 topic 中发送消息时,可以通过配置来决定有几个副本收到这条消息才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定,这个参数支持以下三种值:
(1)acks = 0 :producer 不会等待任何来自 broker 的响应。
特点:低延迟,高吞吐,数据可能会丢失。
如果当中出现问题,导致 broker 没有收到消息,那么 producer 无从得知,会造成消息丢失。
(2)acks = 1(默认值):只要集群中 partition 的 Leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。
如果在 follower 同步之前,leader 出现故障,将会丢失数据。
此时的吞吐量主要取决于使用的是 同步发送 还是 异步发送 ,吞吐量还受到发送中消息数量的限制,例如 producer 在收到 broker 响应之前可以发送多少个消息。
(3)acks = -1 :只有当所有参与复制的节点全部都收到消息时,生产者才会收到一个来自服务器的成功响应。
这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行。
根据实际的应用场景,选择设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步或异步模式,如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync 。
#同步模式
producer.type=sync
#异步模式
producer.type=async
16、说一下什么是副本?
Kafka 为了保证数据不丢失,从 0.8.0 版本开始引入了分区副本机制。在创建 topic 的时候指定 replication-factor ,默认副本为 3 。
副本是相对 partition 而言的,一个分区中包含一个或多个副本,其中一个为 leader 副本,其余为follower 副本,各个副本位于不同的 broker 节点中。
所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上复制数据。当 Leader 挂掉之后,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
17、Kafka 的 ISR 机制
在分区中,所有副本统称为 AR ,Leader 维护了一个动态的 in-sync replica(ISR),ISR 是指与 leader 副本保持同步状态的副本集合。当然 leader 副本本身也是这个集合中的一员。
当 ISR 中的 follower 完成数据同步之后, leader 就会给 follower 发送 ack ,如果其中一个 follower 长时间未向 leader 同步数据,该 follower 将会被踢出 ISR 集合,该时间阈值由 replica.log.time.max.ms 参数设定。当 leader 发生故障后,就会从 ISR 集合中重新选举出新的 leader。
18、LEO、HW、LSO、LW 分别代表什么?
LEO :是 LogEndOffset 的简称,代表当前日志文件中下一条位置,针对每个副本而言的。
HW :水位或水印一词,也可称为高水位(high watermark)。通常被用在流式处理领域(Flink、Spark),以表征元素或事件在基于时间层面上的进展。在 Kafka 中,HW 是针对分区而言的,水位的概念与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(offset)。取 partition 对应的ISR中最小的 LEO 作为 HW ,consumer 最多只能消费到 HW 所在的上一条信息。
LSO : 是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同。
LW : Low Watermark 低水位,代表AR 集合中最小的 logStartOffset 值。
19、分区管理之分区重分配
当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的 leader 副本的角色会转交到集群的其他 follower 副本中。总而言之,这个节点上的分区副本都已经处于功能失效的状态,Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余的可用 broker 节点上,如果放任不管,则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。
当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。
当集群中新增 broker 节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。
为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。 Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、 broker 节点失效的场景下对分区进行迁移。
kafka-reassign-partitions.sh 脚本的使用分为3个步骤:
(1)首先创建需要一个包含主题清单的 JSON 文件;
(2)其次根据主题清单和 broker 节点清单生成一份重分配方案;
(3)最后恨据这份方案执行具体的重分配动作。
分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低,这一点和优先副本的选举有异曲同工之妙。
还需要注章的是,如果要将某个 broker 下线,那么在执行分区重分配动作之前最好先关闭或重启 broker 。这样这个 broker 就不再是任何分区的 leader 节点了,它的分区就可以被分配给集群中的其他 broker 。这样可以减少 broker 间的流量复制,以此提升重分配的性能,以及减少对集群的影响。
20、如何进行分区 leader 选举?
分区 leader 副本的选举由控制器( Controller )负责具体实施。
1、当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的 leader 副本下线,此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OfflinePartitionLeaderElectionStrategy。这种策略的基本思路是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中。一个分区的 AR 集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变。
注意:这里是根据 AR 的顺序而不是 ISR 的顺序进行选举的。
如果ISR集合中没有可用的副本,那么此时还要再检查一下所配置的 unclean.leader.election.enable 参数(默认值为 false )。如果这个参数配置为 true ,那么表示允许从非 ISR 列表中的选举 leader ,从 AR 列表中找到第一个存活的副本即为 leader 。
2、当分区进行重分配的时候也需要执行 leader 的选举动作,对应的选举策略为 ReassignPartitionLeaderElectionStrategy。这个选举策略的思路比较简单:从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。
3、当发生优先副本的选举时,对应的选举策略为 PreferredReplicaPartitionLeaderElectionStrategy 。直接将优先副本设置为 leader 即可,AR 集合中的第一个副本即为优先副本。
4、还有一种情况会发生 leader 的选举,当某节点被优雅地关闭(也就是执行 ControlledShutdown )时,位于这个节点上的 leader 副本都会下线,所以与此对应的分区需要执行 leader 的选举。与此对应的选举策略为 ControlledShutdownPartitionLeaderElectionStrategy 。即从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。
21、Kafka 事务
Kafka 在 0.11 版本引入事务支持,事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer 事务
为了实现跨分区跨会话事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获取的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获取原来的 PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer 事务
上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
22、Kafka 的消费者组跟分区之间有什么关系?
(1)在 Kafka 中,通过消费者组管理消费者,假设一个主题中包含 4 个分区,在一个消费者组中只要一个消费者。那消费者将收到全部 4 个分区的消息。
(2)如果存在两个消费者,那么四个分区将根据分区分配策略分配个两个消费者。
(3)如果存在四个消费者,将平均分配,每个消费者消费一个分区。
(4)如果存在5个消费者,就会出现消费者数量多于分区数量,那么多余的消费者将会被闲置,不会接收到任何信息。
23、如何保证每个应用程序都可以获取到 Kafka 主题中的所有消息,而不是部分消息?
为每个应用程序创建一个消费者组,然后往组中添加消费者来伸缩读取能力和处理能力,每个群组消费主题中的消息时,互不干扰。
24、如何实现 Kafka 消费者每次只消费指定数量的消息?
写一个队列,把 consumer 作为队列类的一个属性,然后增加一个消费计数的计数器,当到达指定数量时,关闭 consumer 。
25、Kafka 如何实现多线程的消费?
Kafka 允许同组的多个 partition 被一个 consumer 消费,但不允许一个 partition 被同组的多个 consumer 消费。
实现多线程步骤如下:
生产者随机分区提交数据(自定义随机分区)。
消费者修改单线程模式为多线程,在消费方面得注意,得遍历所有分区,否则还是只消费了一个区。
26、 Kafka 消费支持几种消费模式?
kafka 消费消息时支持三种模式:
(1) at most once 模式
最多一次。保证每一条消息 commit 成功之后,再进行消费处理。消息可能会丢失,但不会重复。如果在 ack超时或返回错误时 producer 不重试,则该消息可能最终不会写入 Kafka ,因此不会传递给 consumer 。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。
(2) at least once 模式
至少一次。保证每一条消息处理成功之后,再进行 commit 。消息不会丢失,但可能会重复。如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka 。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka 。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终 consumer ,这种策略可能导致重复的工作和不正确的结果。
(3) exactly once 模式
精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer 。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将 Kafka consumer 的偏移量 rollback ,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现 excactly-once 。
Kafka 默认的模式是 at least once ,但这种模式可能会产生重复消费的问题,所以在业务逻辑必须做幂等设计。
在业务场景保存数据时使用了 INSERT INTO ...ON DUPLICATE KEY UPDATE 语法,不存在时插入,存在时更新,是天然支持幂等性的。
27、Kafka 如何保证数据的不重复和不丢失(Exactly Once语义)?
1、Exactly once 模式
精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。
Kafka 默认的模式是 at least once ,但这种模式可能会产生重复消费的问题,所以在业务逻辑必须做幂等设计。
2、幂等性
Producer在生产发送消息时,难免会重复发送消息。Producer 进行 retry 时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。
幂等性具体实现:每个 Producer 在初始化时都会被分配一个唯一的 PID,这个 PID 对应用是透明的,完全没有暴露给用户。对于一个给定的 PID,sequence number 将会从0开始自增。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number ,broker 也就是通过这个来验证数据是否重复。这里的 PID 是全局唯一的,Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。broker上每个 Topic-Partition 也会维护 pid-seq 的映射,并且每次 Commit 都会更新 lastSeq。这样Record Batch 到来时,broker 会先检查 Record Batch 再保存数据。如果 batch 中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存。
3、At Least Once + 幂等性 = Exactly Once ,可以保证数据不重复,不丢失。
28、Kafka 是如何清理过期数据的?
kafka 将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。
数据清理的方式
1、删除
log.cleanup.policy=delete 启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
#清理超过指定时间清理:
log.retention.hours=16
#超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了 copy-on-write 形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于 Java 的 CopyOnWriteArrayList。
2、压缩
将数据压缩,只保留每个 key 最后一个版本的数据。
首先在 broker 的配置中设置 log.cleaner.enable=true 启用 cleaner,这个默认是关闭的。
在 topic 的配置中设置 log.cleanup.policy=compact 启用压缩策略。
29、Kafka 与 CAP 理论
CAP 理论作为分布式系统的基础理论,它描述的是一个分布式系统最多只能满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三者当中的两个。
1、CAP 的含义
一致性(Consistency)
含义:所有节点访问同一份最新的数据副本(在同一时刻相同)。
在写操作完成后开始的任何读操作都必须返回该值,或者后续写操作的结果。也就是说,在一致性系统中,一旦客户端将值写入任何一台服务器并获得响应,那么之后 client 从其他任何服务器读取的都是刚写入的数据。
可用性(Availability)
含义:系统中非故障节点收到的每个请求都必须有响应。
在可用系统中,如果我们的客户端向服务器发送请求,并且服务器未崩溃,则服务器必须最终响应客户端,不允许服务器忽略客户的请求。
分区容错性(Partition tolerance)
含义:指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务,即部分故障不影响整体使用。
事实上我们在设计分布式系统是都会考虑到 bug、硬件、网络等各种原因造成的故障,所以即使部分节点或者网络出现故障,我们要求整个系统还是要继续使用的 (不继续使用,相当于只有一个分区,那么也就没有后续的一致性和可用性了)
2、注意:
(1)不是在任何时候,C 和 A 都要舍弃一个。在没有出现分区问题时,分布式系统就应该有完美的数据一致性和可用性。
(2)C 和 A 的选择不是一定针对整个系统的,可以分阶段分时刻。如支付子系统中账务流水相关的一定是选择强一致性;像用户名、用户头像、用户等级等相关子系统可以选择 A。
(3)CAP 三种特性不是布尔类型、二元对立、非黑即白的,三者都是范围性的,例如当强调一致性时并不是完全摒弃可用性。
(4)CAP三者如何权衡
CA 系统:关注一致性和可用性,它需要非常严格的全体一致的协议,比如“两阶段提交协议”(2PC)。CA 系统不能容忍网络错误或节点错误,一旦出现这样的问题,整个系统就会拒绝写请求,因为它并不知道对面的那个结点是否挂掉了,还是只是网络问题。唯一安全的做法就是把自己变成只读的。
很遗憾,这种情况几乎不存在。因为分布式系统,网络分区是必然的。如果要舍弃P,那么就是要舍弃分布式系统,CAP 也就无从谈起了。可以说P是分布式系统的前提,所以这种情况是不存在的。
比如一般的关系型数据库,像是 MySQL 或者是Oracle,它们都保证了一致性和可用性,但是并不是分布式系统。从这点上来说CAP并不是等价的,我们并不能通过牺牲 CA 来提升 P 。要想提升分区容错性,只能通过提升基础设施的稳定性来达到。也就是说这并不是一个软件问题。
CP 系统:关注一致性和分区容忍性。它关注的是系统里大多数人的一致性协议,比如:Paxos 算法 (Quorum 类的算法)。这样的系统只需要保证大多数结点数据一致,而少数的结点会在没有同步到最新版本的数据时变成不可用的状态。这样能够提供一部分的可用性。
一个系统保证了一致性和分区容错性,舍弃可用性。也就是说在极端情况下,允许出现系统无法访问的情况出现,这个时候往往会牺牲用户体验,让用户保持等待,一直到系统数据一致了之后,再恢复服务。
对于有些系统而言,一致性是安身立命之本,比如 Hbase、 Redis 这种分布式存储,数据一致性是最基本的要求。不满足一致性的存储显然不会有用户愿意使用。
ZooKeeper 也是一样,任何时候访问 ZK 都可以获得一致性的结果。它的职责就是保证管辖下的服务保持同步和一致,显然不可能放弃一致性。但是在极端情况下,ZK 可能会丢弃一些请求,消费者需要重新请求才能获得结果。
AP 系统:这样的系统关心可用性和分区容忍性。因此,这样的系统不能达成一致性,需要给出数据冲突,给出数据冲突就需要维护数据版本。
这种是大部分的分布式系统的设计,保证高可用和分区容错,但是会牺牲一致性。比如淘宝购物以及12306购票等等,前面说过淘宝可以做到全年可用性5个9的超高级别,但是此时就无法保证数据一致性了。
举个例子,我们在12306买票的时候就经常会遇到。在我们点击购买的时候,系统并没有提示没票。等我们输入了验证码,付款的时候才会告知,已经没有票了。这就是因为我们在点击购买的时候,数据没有达成一致性,在付款校验的时候才检验出余票不足。这种设计会牺牲一些用户体验,但是可以保证高可用,让用户不至于无法访问或者是长时间等待,也算是一种取舍吧。
权衡三者的关键点取决于业务。
放弃了一致性,满足分区容错,那么节点之间就有可能失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会容易导致全局数据不一致性。对于互联网应用来说(如新浪,网易),机器数量庞大,节点分散,网络故障再正常不过了,那么此时就是保障 AP ,放弃 C 的场景,而从实际中理解,像门户网站这种偶尔没有一致性是能接受的,但不能访问问题就非常大了。
对于银行来说,就是必须保证强一致性,也就是说 C 必须存在,那么就只用 CA 和 CP 两种情况,当保障强一致性和可用性(CA),那么一旦出现通信故障,系统将完全不可用。另一方面,如果保障了强一致性和分区容错(CP),那么就具备了部分可用性。实际究竟应该选择什么,是需要通过业务场景进行权衡的(并不是所有情况都是 CP 好于 CA ,只能查看信息但不能更新信息有时候还不如直接拒绝服务)。
3、Kafka 中的 CAP 机制
Kafka 满足的是 CAP 定律当中的 CA ,其中 Partition tolerance 通过的是一定的机制尽量的保证分区容错性。其中 C 表示的是数据一致性。A 表示数据可用性。
Kafka 首先将数据写入到不同的分区里面去,每个分区又可能有好多个副本,数据首先写入到 leader 分区里面去,读写的操作都是与 leader 分区进行通信,保证了数据的一致性原则,也就是满足了 Consistency 原则。然后 Kafka 通过分区副本机制,来保证了 Kafka 当中数据的可用性。但是也存在另外一个问题,就是副本分区当中的数据与 leader 当中的数据存在差别的问题如何解决,这个就是 Partition tolerance 的问题。
Kafka 为了解决 Partition tolerance 的问题,使用了 ISR 的同步策略,来尽最大可能减少 Partition tolerance 的问题。
每个 leader 会维护一个 ISR( a set of in-sync replicas ,基本同步)列表。ISR 列表主要的作用就是决定哪些副本分区是可用的,也就是说可以将 leader分区里面的数据同步到副本分区里面去,决定一个副本分区是否可用的条件有2个:
- replica.lag.time.max.ms=10000 副本分区与主分区心跳时间延迟,超过这个时间就踢出 ISR
- replica.lag.max.messages=4000 表示当前某个副本落后leader的消息数量超过了这个参数的值,那么leader就会把follower从ISR中删除(0.10.0版本该参数被移除)
produce 请求被认为完成时的确认值:request.required.acks=0。
30、为什么 Kafka 不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从而实现的是一种主写主读的生产消费模型。 Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
(1)数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历:网络 → 主节点内存 → 网络 → 从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络 → 主节点内存 → 主节点磁盘 → 网络 → 从节点内存 → 从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。 Kafka 的主写主读的优点就很多了:
(1)可以简化代码的实现逻辑,减少出错的可能;
(2)将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
(3)没有延时的影响;
(4)在副本稳定的情况下,不会出现数据不一致的情况。
为此,Kafka 又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于Kafka优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宜之计。
31、Kafka 的数据 offset 读取流程
(1)连接 ZK 集群,从 ZK 中拿到对应 topic 的 partition 信息和 partition 的 Leader 的相关信息
(2)连接到对应 Leader 对应的 broker
(3)consumer 将⾃自⼰己保存的 offset 发送给 Leader
(4)Leader 根据 offset 等信息定位到 segment(索引⽂文件和⽇日志⽂文件)
(5)根据索引⽂文件中的内容,定位到⽇日志⽂文件中该偏移量量对应的开始位置读取相应⻓长度的数据并返回给 consumer
32、Kafka 消息数据积压,Kafka 消费能力不足怎么处理?
消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
因此我们需要先定位消费慢的原因,如果是 bug 则处理 bug 。如果是因为本身消费能力较弱,下游的数据处理不及时,我们可以优化下消费逻辑,提高每批次拉取的数量。若批次拉取数据过少,即:拉取数据/处理时间 < 生产速度,导致处理的数据小于生产的数据,也会造成数据积压。比如之前是一条一条消息消费处理的,这次我们批量处理。比如数据库的插入,一条一条插和批量插效率是不一样的。
假如逻辑我们已经都优化了,但还是慢。Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并同时提升消费组的消费者数量,消费者数 = 分区数(两者缺一不可)。
33、Kafka 的 offset 维护
Kafka 0.9版本之前,consumer 默认将 offset 保存在 Zookeeper 中。
从0.9版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的名为:__consumer_offsets 的 topic 中。
实际开发场景中在 Spark 和 Flink 中,可以自己手动提交 Kafka 的 offset ,或者是 Flink 两阶段提
交自动提交 offset。
为您推荐与 kafka 相关的帖子: