内容简介:在MQ里,顺序消息的意思是消费消息的顺序和消息发送时(单机发送)的顺序保持一致。比如ProducerA按照顺序发送msga, msgb, msgc三条消息,那么consumer消费的时候也应该按照msga, msgb, msgc来消费。对于顺序消息,在我们实际使用中发现,大部分业务系统并不需要或者并不依赖MQ提供的顺序机制,这些业务本身往往就能处理无序的消息,比如很多系统中都有状态机,是否消费消息必须根据状态机当前的状态。
背景
在MQ里,顺序消息的意思是消费消息的顺序和消息发送时(单机发送)的顺序保持一致。比如ProducerA按照顺序发送msga, msgb, msgc三条消息,那么consumer消费的时候也应该按照msga, msgb, msgc来消费。
对于顺序消息,在我们实际使用中发现,大部分业务系统并不需要或者并不依赖MQ提供的顺序机制,这些业务本身往往就能处理无序的消息,比如很多系统中都有状态机,是否消费消息必须根据状态机当前的状态。
但是在一些场景中顺序消息也有其必要性:比如日志收集和依赖binlog同步驱动业务等。 就这两个场景而言,同样是顺序消息但对顺序的需求却不一定一样:比如日志收集中我们一般认为对顺序的要求比较弱,即绝大多数时是有序即可,遇到一些极端情况,比如Server宕机,容量调整的时候我们可以暂时容忍一些无序。但是对于一个依赖MySQL binlog同步来驱动的业务,短暂的无序都将会导致整个业务的错乱。
分析现有的一些MQ后发现,它们并不能在所有情况下提供可靠的顺序支持。现在市面上的MQ基本上都是以partition - based模型来提供顺序支持。我们以Kafka为例:topic分为一个或多个partition,partition可以理解为一个顺序文件,producer发送消息的时候,按照一定的策略选择partition,比如partition = hash(key) % partition num来选择该消息发送给哪个partition,那么具有相同key的消息就会落到相同的partition上,而consumer消费的时候一个consumer独占地绑定在一个partition上。这样一来,消息就是顺序消费的了:
但是这种模型存在一些问题:
-
partition的个数就是消费的并行度,那么如果现在consumer处理不过来需要增加consumer则需要对应地增加partition。而根据上面的描述partition的个数一旦改变,则顺序将无法保证(partition = hash(key) % partition num 公式里partition num发生了改变,则选择的partition也会发生变化)。
所以我们一般在业务上线之前,就要做出合理的容量规划,预先创建出足够的partition,但有的时候容量规划是困难的,实践中往往是预先分配大量的partition,比如几百甚至几千,然而大量的partition对性能以及运维都带来麻烦。
-
扩容partition后,如果高峰期已过,想进行缩容则基本上不可行(比如Kafka就不允许减少partition),除了缩容带来顺序变化外,还有一点是怎么保证被缩容的partition上的消息已经完全消费完成了呢?
-
partition的移动问题,partition如果分配在某台broker上之后再移动就很麻烦,一旦这台broker容量不足,需要进行负载均衡就很困难了,这可能需要在不同的机器上传输大量的数据。
-
对可用性的挑战,顺序发送的时候某个key的消息必须总是发送给指定的partition的,如果一旦某台server挂掉,或者正常的停机维护,那么位于这台server的partition就不能收消息了,但是也不能发送给其他partition,否则顺序就会错乱。
虽然我们可以通过多副本机制(Replication)来确保即使该partition所在机器出现故障时候仍然有其他副本提供服务,但是一般选举出一个新的副本通常需要花费几秒到几分钟不等(比如早期的Kafka版本Leader迁移是串行执行的, 在分区特别多的时候,选举出新的leader可能需要分钟级时间),在此期间发送到该partition的所有消息都无法发送。
-
堆积问题,如果预分配时候的partition过少,这个时候堆积了大量的消息,那么即使扩容也没有办法了:
所以我们认为现有的一些所谓顺序消息机制并不是简单可依赖的。你以为MQ给你提供了顺序保障,但实际上在一些时候并不是这样,那么这个时候使用方为了应对这种异常情况就需要做出各种应对措施,增加了使用的复杂度。而我们希望提供一种简单可依赖的顺序消息,也就是使用方可以放心的将顺序保证交给MQ。
方案设计
首先我们来分析无法保证顺序的根源是什么。我们选择partition所使用的公式是 partition = hash(key) % partition num。正是因为partition num发生了变化导致公式的结果发生了变化,进而打破了顺序保证。
其实对于这个公式我们可能并不陌生,除了在MQ中使用,我们在数据库分库分表中往往也有这种套路。
在数据库分库分表中我们会通过一个分区键计算其分区,然后得到表名或库名(如下伪代码所示,user_id是分表键,总共分为100张表):
//计算userId的哈希值对100取模,得到该userId应该落在哪个表
int id = hash(userId) % 100;
String tableName = "users_" + id;
String sql = "select name from " + tableName + "where user_id = ?";
而且在分库分表中前期因为业务量不大,我们往往不会分很多库(或者我们也分了多个库,但是这些库都落在相同的机器上),但是为了后期添加分库方便(扩容)我们会预先分出很多表。比如我们前期分成100张表,但是这100张表都在相同的库里,待到业务增长之后,单库无法支撑,我们会将1000张表划分到不同的DB里。
比如我们将表0 - 50落在DB1, 50 - 100落到DB2,这样我们的处理能力就翻倍了,但是因为程序里还是按照100进行分表的,所以对应用没有感知。
这种机制相当于引入了一个中间层,程序面对的是的分表,最后这个表是落在什么DB上通过中间层进行映射过去就可以了。
那么其实我们是可以借鉴这种思路应用在MQ的扩容缩容中的。为此我们引入了logic partition的概念。也就是Producer发送消息的时候,我们并不决定它发送到哪个具体的Server上的具体的partition里(后文将其称之为物理partition, physical partition)。我们只是先得到logic partition,使用这个计算公式: logic partition = hash(key) % logic partition num。而logic partition num我们会固定住,永不改变。比如我们将logic partition num固定为1000。但是这里跟分库分表中的分1000张表不同,logic partition仅仅是逻辑上的,不存在任何存储实体,所以即使分配的再大也没有性能上的开销。计算得到logic partition后,我们根据logic partition的映射再来决定该消息应该落到具体哪个physical partition上。我们会根据logic partition的范围进行映射,比如logic partition 0 - 500 映射到 physcial partition 1上,500 - 1000 映射到physcial partition 2上。
接下来我们来看看这种措施如何应对本文开头所提出的一序列问题呢:
-
扩容 在这里扩容其实就是对physical partition的分裂过程。比如开始时我们创建了两个分区: physical partition 1, physical partition 2,因为消费不过来,我们要将physcial partiton 1扩容,那么我们将会得到 logic partiton 0 - 250 映射到physical partition 3,logic partition 250 - 500 映射到physical partition 4(注:范围的分裂不一定是平均的,比如我们也可以按照[0 - 200)和[200 - 500)进行划分 )。
-
缩容 缩容其实就是对physical partition的合并过程,我们将physical partiton 3和physical partition 4合并得到physical partition 5。那么现在logic partiton 0 - 500就映射到physical partition 5。
-
负载均衡 负载均衡其实就是logic partition到physical partiton的重新映射过程。也就是原来0 - 500 映射到 physical partition 5,现在我们将其映射到physical partition 6,而physical partition 6可以分配在一台空闲的Server上。不仅如此,重新映射也可以解决可用性问题:一台server停机维护时将落在上面的logic partition进行重新映射,分配到另外一台Server上即可,这样我们就可以打造Always writtable ordered message queue。
总结
本文借鉴分库分表中的预先分表的方法,提出logic partition的抽象层解决物理partition扩容缩容时无法保证顺序的问题。但是实际实现时候我们会发现MQ的这种logic partition分法要比数据库中分表复杂得多。因为MQ是的消费是持续性的,也就是我可以读取历史数据。数据库中分库分表一旦调整之后,那么它呈现的就是最终视图,而MQ里昨天我们可能还只有一个physical partition,今天我们划分为两个,那么我们消费昨天的数据和今天的数据的时候如何进行无缝的切换呢?敬请期待本文的下半部分,我们是如何实现这种机制的。
QMQ是去哪儿网开源的分布式消息中间件,在去哪儿网内部应用十分广泛,提供了很独特的存储模型,延时消息,事务消息等。点击原文链接就会跳到github地址(https://github.com/qunarcorp/qmq),欢迎给我们提交PR, Star,谢谢。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【消息队列】RabbitMQ如何保证消息顺序
- 消息中间件面试题:如何保证消息的顺序性
- 深入理解 RocketMQ 普通消息和顺序消息使用,原理,优化
- 解决事物提交与消息发送顺序问题
- 顺序消息管道 message pipe v1.0.3 发布,新增消息元数据定义
- 顺序消息管道 message pipe v1.0.3 发布,新增消息元数据定义
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。