消息队列二三事

栏目: 后端 · 发布时间: 6年前

内容简介:最近在看kafka的服务质量一般可以分为三个级别,下面说明它们不同语义。至多一次,消息可能丢失,但绝不会重复传输。

最近在看kafka的 代码 ,就免不了想看看消息队列的一些要点: 服务质量(QOS)性能扩展性 等等,下面一一探索这些概念,并谈谈在特定的消息队列 如kafka或者mosquito 中是如何具体实现这些概念的。

服务质量

服务语义

服务质量一般可以分为三个级别,下面说明它们不同语义。

At most once

至多一次,消息可能丢失,但绝不会重复传输。

生产者:完全依赖底层TCP/IP的传输可靠性,不做特殊处理,所谓“发送即忘”。kafka中设置 acks=0

消费者:先保存消费进度,再处理消息。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔。

At least once

至少一次,消息绝不会丢,但是可能会重复。

生产者:要做消息防丢失的保证。kafka中设置 acks=1 或 all 并设置 retries>0

消费者:先处理消息,再保存消费进度。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。

Exactly once

精确一次,每条消息肯定会被传输一次且仅一次。

这个级别光靠消息队列本身并不好保证,有可能要依赖外部组件。

生产者:要做消息防丢失的保证。kafka中设置 acks=1 或 all 并设置 retries>0 。mosquito中通过四步握手与DUP、MessageID等标识来实现单次语义。

消费者:要做消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操作中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证 原子性 。kafka中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证自己或者其他消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合 ConsumerRebalanceListenerseek 和一个 外部系统 (如支持事务的数据库)共同来实现单次语义。此外,kafka还提供了GUID以便用户自行实现去重。 kafka 0.11 版本通过3个大的改动支持 EOS :1.幂等的producer;2. 支持事务;3. 支持EOS的流式处理(保证读-处理-写全链路的EOS)。

这三个级别 可靠性 依次增加,但是 延迟带宽占用 也会增加,所以实际情况中,要依据业务类型做出权衡。

可靠性

上面的三个语义不仅需要生产者和消费者的配合实现,还要broker本身的可靠性来进行保证。 可靠性就是只要broker向producer发出确认,就一定要保证这个消息可以被consumer获取。

kafka中一个 topic 有多个 partition ,每个 partition 又有多个 replica ,所有 replica 中有一个 leaderISR 是一定要同步 leader 后才能返回提交成功的 replica集OSR 内的 replica 尽力的去同步 leader ,可能数据版本会落后。在 kafka 工作的过程中,如果某个 replica 同步速度慢于 replica.lag.time.max.ms 指定的阈值,则被踢出 ISR 存入 OSR ,如果后续速度恢复可以回到 ISR 中。可以配置 min.insync.replicas 指定 ISR 中的 replica 最小数量,默认该值为1。 LEO 是分区的最新数据的offset,当数据写入leader后,LEO就立即执行该最新数据,相当于最新数据标识位。 HW 是当写入的数据被同步到所有的 ISR 中的副本后,数据才认为已提交, HW 更新到该位置, HW 之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到,相当于所有副本同步数据标识位。

每个 partition 的所有 replica 需要进行 leader 选举(依赖 ZooKeeper )。在 leader 宕机后,只能从 ISR 列表中选取新的 leader ,无论 ISR 中哪个副本被选为新的 leader ,它都知道 HW 之前的数据,可以保证在切换了 leader 后,消费者可以继续看到 HW 之前已经提交的数据。当 ISR 中所有 replica 都宕机该 partition 就不可用了,可以设置 unclean.leader.election.enable=true ,该选项使得 kafka 选择任何一个活的replica成为leader然后继续工作,此 replica 可能不在 ISR 中, 就可能导致数据丢失 。所以实际使用中需要进行 可用性可靠性 的权衡。

kafka建议数据可靠存储不依赖于数据强制刷盘(会影响整体性能),而是依赖于 replica

顺序消费

顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。

主要可能破坏顺序的场景是生产者投放两条消息AB,然后A失败重投递导致消费者拿到的消息是BA。

kafka中能保证 分区内部消息的有序性 ,其做法是设置 max.in.flight.requests.per.connection=1 ,也就是说生产者在未得到broker对消息A的确认情况下是不会发送消息B的,这样就能保证broker存储的消息有序,自然消费者请求到的消息也是有序的。

但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥 batch 的威力。

如果想要 整个topic有序 ,那就只能一个 topic 一个 partition 了,一个 consumer group 也就只有一个 consumer 了。这样就违背了kafka高吞吐的初衷。

重复消费

重复消费是指一个消息被消费者重复消费了。这个问题也是上面第三个语义需要解决的。

一般的消息系统如 kafka 或者类似的 rocketmq 都不能也不提倡在系统内部解决,而是配合第三方组件,让用户自己去解决。究其原因还是 解决问题的成本解决问题后获得的价值 不匹配,所以干脆不解决,就像操作系统对待死锁一样,采取“鸵鸟政策”。

但是 kafka 0.11 还是处理了这个问题,见 发行说明 ,维护者是想让用户无可挑剔嘛 [笑cry]。

性能

衡量一个消息系统的性能有许多方面,最常见的就是下面几个指标。

连接数

是指系统在同一时刻能支持多少个生产者或者消费者的连接总数。连接数和broker采用的网络IO模型直接相关,常见模型有: 单线程连接每线程ReactorProactor 等。

单线程一时刻只能处理一个连接,连接每线程受制于server的线程数量,Reactor是目前主流的高性能网络IO模型,Proactor由于操作系统对 真异步 的支持不太行所以尚未流行。

kafka的 broker 采用了类似于 NettyReactor 模型:1(1个 Acceptor 线程)+N(N个 Processor 线程)+M(M个 Work 线程)。

其中 Acceptor 负责监听新的连接请求,同时注册 OPACCEPT 事件,将新的连接按照 RoundRobin 的方式交给某个 Processor 线程处理。

每个 Processor 都有一个 NIO selector ,向 Acceptor 分配的 SocketChannel 注册 OPREAD、OPWRITE 事件,对socket进行读写。N由 num.networker.threads 决定。

Worker 负责具体的业务逻辑如:从 requestQueue 中读取请求、数据存储到磁盘、把响应放进 responseQueue 中等等。M的大小由 num.io.threads 决定。

Reactor模型一般基于IO多路复用(如 selectepoll ),是非阻塞的,所以少量的线程能处理大量的连接。

如果大量的连接都是 idle 的,那么Reactor使用 epoll 的效率是杠杠的,如果大量的连接都是活跃的,此时如果没有 Proactor 的支持就最好把 epoll 换成 select 或者 poll

具体做法是 -Djava.nio.channels.spi.SelectorProvidersun.nio.ch 包下面的 EPollSelectorProvider 换成 PollSelectorProvider

QPS

是指系统每秒能处理的请求数量。QPS通常可以体现 吞吐量 (该术语很广,可以用TPS/QPS、PV、UV、业务数/小时等单位体现)的大小。

kafka中由于可以采用 batch 的方式(还可以 压缩 ),所以每秒钟可以处理的请求很多(因为减少了 解析量网络往复次数磁盘IO次数 等)。另一方面,kafka每一个topic都有多个partition,所以同一个topic下可以 并行 (注意不是 并发 哟)服务多个生产者和消费者,这也提高了吞吐量。

平均响应时间

平均响应时间是指每个请求获得响应需要的等待时间。

kafka中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出现在哪些地方呢?

网络?有可能,但是这个因素总体而言不是kafka能控制的,kafka可以对消息进行编码压缩并批量提交,减少带宽占用;

磁盘?很有可能,所以kafka从分利用OS的 pagecache ,并且对磁盘采用 顺序写 ,这样能大大提升磁盘的写入速度。同时kafka还使用了 零拷贝技术 ,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,内核buffer到用户buffer的拷贝过程省略了,加快了处理速度。此外还有 文件分段 技术,每个 partition 都分为多个 segment ,避免了大文件操作的同时提高了并行度。

CPU?不大可能,因为消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中一般不是瓶颈。

并发数

是指系统同时能处理的请求数量数。一般而言, QPS = 并发数/平均响应时间 或者说 并发数 = QPS*平均响应时间

这个参数一般只能估计或者计算,没法直接测。顾名思义,机器性能越好当然并发数越高咯。此外注意用上多线程技术并且提高代码的 并行度 、优化IO模型、减少减少内存分配和释放等手段都是可以提高并发数的。

扩展性

消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。

kafka 中扩展性的基石就是 topic 采用的 partition 机制。第一, Kafka 允许 Partitioncluster 中的 Broker 之间移动,以此来解决数据倾斜问题。第二,支持自定义的 Partition 算法,比如你可以将同一个 Key 的所有消息都路由到同一个 Partition 上去(来获得顺序)。第三, partition 的所有 replica 通过 ZooKeeper 来进行集群管理,可以动态增减副本。第四,partition也支持动态增减。

对于 producer ,不存在扩展问题,只要 broker 还够你连接就行。

对于 consumer ,一个 consumer group 中的 consumer 可以增减,但是最好不要超过一个 topicpartition 数量,因为多余的 consumer 并不能提升处理速度,一个 partition 在同一时刻只能被一个 consumer group 中的一个 consumer 消费

代码上的可扩展性就属于 设计模式 的领域了,这里不谈。

参考

查看原文,来自mageekchiu。总结不到位的地方请不吝赐教。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

商业模式新生代

商业模式新生代

亚历山大•奥斯特瓦德 (Alexander Osterwalder)、伊夫•皮尼厄 (Yves Pigneur) / 王帅、毛心宇、严威 / 机械工业出版社 / 2011-8-15 / 88.00元

中文官网:http://www.bizmodel.org 内容简介:当你愉快的看完第一章:商业模式画布,赫然发现这些构成要素全 都交织成一幅清晰的图像在脑海中呈现,它们如何互相影响、如何交互作用全都历历在目。利用商业模式画布分析瑞士银行、Google、Lego、Wii 、Apple等跨国企业,归纳出三种不同的产业 模式,也涵括新近的热门现象免费效应及长尾理论等。在这些有趣的例子中,我们不仅更......一起来看看 《商业模式新生代》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具