内容简介:最近在看kafka的服务质量一般可以分为三个级别,下面说明它们不同语义。至多一次,消息可能丢失,但绝不会重复传输。
最近在看kafka的 代码 ,就免不了想看看消息队列的一些要点: 服务质量(QOS) 、 性能 、 扩展性 等等,下面一一探索这些概念,并谈谈在特定的消息队列 如kafka或者mosquito 中是如何具体实现这些概念的。
服务质量
服务语义
服务质量一般可以分为三个级别,下面说明它们不同语义。
At most once
至多一次,消息可能丢失,但绝不会重复传输。
生产者:完全依赖底层TCP/IP的传输可靠性,不做特殊处理,所谓“发送即忘”。kafka中设置 acks=0
。
消费者:先保存消费进度,再处理消息。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔。
At least once
至少一次,消息绝不会丢,但是可能会重复。
生产者:要做消息防丢失的保证。kafka中设置 acks=1 或 all
并设置 retries>0
。
消费者:先处理消息,再保存消费进度。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。
Exactly once
精确一次,每条消息肯定会被传输一次且仅一次。
这个级别光靠消息队列本身并不好保证,有可能要依赖外部组件。
生产者:要做消息防丢失的保证。kafka中设置 acks=1 或 all
并设置 retries>0
。mosquito中通过四步握手与DUP、MessageID等标识来实现单次语义。
消费者:要做消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操作中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证 原子性
。kafka中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证自己或者其他消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合 ConsumerRebalanceListener
、 seek
和一个 外部系统
(如支持事务的数据库)共同来实现单次语义。此外,kafka还提供了GUID以便用户自行实现去重。 kafka 0.11
版本通过3个大的改动支持 EOS
:1.幂等的producer;2. 支持事务;3. 支持EOS的流式处理(保证读-处理-写全链路的EOS)。
这三个级别 可靠性 依次增加,但是 延迟 和 带宽占用 也会增加,所以实际情况中,要依据业务类型做出权衡。
可靠性
上面的三个语义不仅需要生产者和消费者的配合实现,还要broker本身的可靠性来进行保证。 可靠性就是只要broker向producer发出确认,就一定要保证这个消息可以被consumer获取。
kafka中一个 topic
有多个 partition
,每个 partition
又有多个 replica
,所有 replica
中有一个 leader
, ISR
是一定要同步 leader
后才能返回提交成功的 replica集
, OSR
内的 replica
尽力的去同步 leader
,可能数据版本会落后。在 kafka
工作的过程中,如果某个 replica
同步速度慢于 replica.lag.time.max.ms
指定的阈值,则被踢出 ISR
存入 OSR
,如果后续速度恢复可以回到 ISR
中。可以配置 min.insync.replicas
指定 ISR
中的 replica
最小数量,默认该值为1。 LEO
是分区的最新数据的offset,当数据写入leader后,LEO就立即执行该最新数据,相当于最新数据标识位。 HW
是当写入的数据被同步到所有的 ISR
中的副本后,数据才认为已提交, HW
更新到该位置, HW
之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到,相当于所有副本同步数据标识位。
每个 partition
的所有 replica
需要进行 leader
选举(依赖 ZooKeeper
)。在 leader
宕机后,只能从 ISR
列表中选取新的 leader
,无论 ISR
中哪个副本被选为新的 leader
,它都知道 HW
之前的数据,可以保证在切换了 leader
后,消费者可以继续看到 HW
之前已经提交的数据。当 ISR
中所有 replica
都宕机该 partition
就不可用了,可以设置 unclean.leader.election.enable=true
,该选项使得 kafka
选择任何一个活的replica成为leader然后继续工作,此 replica
可能不在 ISR
中, 就可能导致数据丢失
。所以实际使用中需要进行 可用性
与 可靠性
的权衡。
kafka建议数据可靠存储不依赖于数据强制刷盘(会影响整体性能),而是依赖于 replica
。
顺序消费
顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。
主要可能破坏顺序的场景是生产者投放两条消息AB,然后A失败重投递导致消费者拿到的消息是BA。
kafka中能保证 分区内部消息的有序性
,其做法是设置 max.in.flight.requests.per.connection=1
,也就是说生产者在未得到broker对消息A的确认情况下是不会发送消息B的,这样就能保证broker存储的消息有序,自然消费者请求到的消息也是有序的。
但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥 batch 的威力。
如果想要 整个topic有序
,那就只能一个 topic
一个 partition
了,一个 consumer group
也就只有一个 consumer
了。这样就违背了kafka高吞吐的初衷。
重复消费
重复消费是指一个消息被消费者重复消费了。这个问题也是上面第三个语义需要解决的。
一般的消息系统如 kafka 或者类似的 rocketmq 都不能也不提倡在系统内部解决,而是配合第三方组件,让用户自己去解决。究其原因还是 解决问题的成本 与 解决问题后获得的价值 不匹配,所以干脆不解决,就像操作系统对待死锁一样,采取“鸵鸟政策”。
但是 kafka 0.11
还是处理了这个问题,见 发行说明
,维护者是想让用户无可挑剔嘛 [笑cry]。
性能
衡量一个消息系统的性能有许多方面,最常见的就是下面几个指标。
连接数
是指系统在同一时刻能支持多少个生产者或者消费者的连接总数。连接数和broker采用的网络IO模型直接相关,常见模型有: 单线程 、 连接每线程 、 Reactor 、 Proactor 等。
单线程一时刻只能处理一个连接,连接每线程受制于server的线程数量,Reactor是目前主流的高性能网络IO模型,Proactor由于操作系统对 真异步 的支持不太行所以尚未流行。
kafka的 broker
采用了类似于 Netty
的 Reactor
模型:1(1个 Acceptor
线程)+N(N个 Processor
线程)+M(M个 Work
线程)。
其中 Acceptor
负责监听新的连接请求,同时注册 OPACCEPT
事件,将新的连接按照 RoundRobin
的方式交给某个 Processor
线程处理。
每个 Processor
都有一个 NIO selector
,向 Acceptor
分配的 SocketChannel
注册 OPREAD、OPWRITE
事件,对socket进行读写。N由 num.networker.threads
决定。
Worker
负责具体的业务逻辑如:从 requestQueue
中读取请求、数据存储到磁盘、把响应放进 responseQueue
中等等。M的大小由 num.io.threads
决定。
Reactor模型一般基于IO多路复用(如 select
, epoll
),是非阻塞的,所以少量的线程能处理大量的连接。
如果大量的连接都是 idle
的,那么Reactor使用 epoll
的效率是杠杠的,如果大量的连接都是活跃的,此时如果没有 Proactor
的支持就最好把 epoll
换成 select
或者 poll
。
具体做法是 -Djava.nio.channels.spi.SelectorProvider
把 sun.nio.ch
包下面的 EPollSelectorProvider
换成 PollSelectorProvider
。
QPS
是指系统每秒能处理的请求数量。QPS通常可以体现 吞吐量 (该术语很广,可以用TPS/QPS、PV、UV、业务数/小时等单位体现)的大小。
kafka中由于可以采用 batch 的方式(还可以 压缩 ),所以每秒钟可以处理的请求很多(因为减少了 解析量 、 网络往复次数 、 磁盘IO次数 等)。另一方面,kafka每一个topic都有多个partition,所以同一个topic下可以 并行 (注意不是 并发 哟)服务多个生产者和消费者,这也提高了吞吐量。
平均响应时间
平均响应时间是指每个请求获得响应需要的等待时间。
kafka中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出现在哪些地方呢?
网络?有可能,但是这个因素总体而言不是kafka能控制的,kafka可以对消息进行编码压缩并批量提交,减少带宽占用;
磁盘?很有可能,所以kafka从分利用OS的 pagecache
,并且对磁盘采用 顺序写
,这样能大大提升磁盘的写入速度。同时kafka还使用了 零拷贝技术
,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,内核buffer到用户buffer的拷贝过程省略了,加快了处理速度。此外还有 文件分段
技术,每个 partition
都分为多个 segment
,避免了大文件操作的同时提高了并行度。
CPU?不大可能,因为消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中一般不是瓶颈。
并发数
是指系统同时能处理的请求数量数。一般而言, QPS = 并发数/平均响应时间
或者说 并发数 = QPS*平均响应时间
。
这个参数一般只能估计或者计算,没法直接测。顾名思义,机器性能越好当然并发数越高咯。此外注意用上多线程技术并且提高代码的 并行度 、优化IO模型、减少减少内存分配和释放等手段都是可以提高并发数的。
扩展性
消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。
kafka
中扩展性的基石就是 topic
采用的 partition
机制。第一, Kafka
允许 Partition
在 cluster
中的 Broker
之间移动,以此来解决数据倾斜问题。第二,支持自定义的 Partition
算法,比如你可以将同一个 Key
的所有消息都路由到同一个 Partition
上去(来获得顺序)。第三, partition
的所有 replica
通过 ZooKeeper
来进行集群管理,可以动态增减副本。第四,partition也支持动态增减。
对于 producer
,不存在扩展问题,只要 broker
还够你连接就行。
对于 consumer
,一个 consumer group
中的 consumer
可以增减,但是最好不要超过一个 topic
的 partition
数量,因为多余的 consumer
并不能提升处理速度,一个 partition
在同一时刻只能被一个 consumer group
中的一个 consumer
消费
代码上的可扩展性就属于 设计模式 的领域了,这里不谈。
参考
查看原文,来自mageekchiu。总结不到位的地方请不吝赐教。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
- springboot整合各种消息队列(二):rabbitmq消息队列
- springboot整合各种消息队列(一):redis消息队列
- 消息队列系列二(IOT中消息队列的应用)
- 消息队列(七)RocketMQ消息发送
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
商业模式新生代
亚历山大•奥斯特瓦德 (Alexander Osterwalder)、伊夫•皮尼厄 (Yves Pigneur) / 王帅、毛心宇、严威 / 机械工业出版社 / 2011-8-15 / 88.00元
中文官网:http://www.bizmodel.org 内容简介:当你愉快的看完第一章:商业模式画布,赫然发现这些构成要素全 都交织成一幅清晰的图像在脑海中呈现,它们如何互相影响、如何交互作用全都历历在目。利用商业模式画布分析瑞士银行、Google、Lego、Wii 、Apple等跨国企业,归纳出三种不同的产业 模式,也涵括新近的热门现象免费效应及长尾理论等。在这些有趣的例子中,我们不仅更......一起来看看 《商业模式新生代》 这本书的介绍吧!
Markdown 在线编辑器
Markdown 在线编辑器
html转js在线工具
html转js在线工具