RabbitMQ实现延迟队列

栏目: Redis · 发布时间: 6年前

内容简介:RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL。以下介绍下相关概念及方法消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.

RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL。

以下介绍下相关概念及方法

Dead Letter Exchanges

消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.

以下任一条件满足,即可认为是死信:

  • 消息被拒绝消费(basic.reject or basic.nack)并且设置了requeue=fasle
  • 消息的TTL到了(消息过期)
  • 达到了队列的长度限制

需要注意的是,Dead letter exchanges (DLXs) 其实就是普通的exchange,可以和正常的exchange一样的声明或者使用。

死信消息路由

队列中可以设置两个属性:

  • x-dead-letter-exchange
  • x-dead-letter-routing-key

当这个队列里面的消息成为死信之后,就会投递到x-dead-letter-exchange指定的exchange中,其中带着的routing key就是中指定的值x-dead-letter-routing-key。

而如果使用默认的exchange(routing key就是希望指定的队列),则只需要把x-dead-letter-exchange设置为空(不能不设置),类似下面

RabbitMQ实现延迟队列

死信消息的路由则会根据x-dead-letter-routing-key所指定的进行路由,如果这个值没有指定,则会按照消息一开始发送的时候指定的routing key进行路由

Dead-lettered messages are routed to their dead letter exchange either:
with the routing key specified for the queue they were on; or, if this was not set, with the same routing keys they were originally published with.

例如,如果一开始你对exchange X发送消息,带着routing key “foo”,进入了队列 Q然后消息变死信后,他会被重新发送到 dead letter exchange ,其中发给dead letter exchange带着的routing key 还是foo。 但如果这个队列Q本身是设置了x-dead-letter-routing-key bar, 那么他发送到 dead letter exchange的时候,带着的routing key 就是bar。

需要注意的是,当死信消息重新路由到新的队列的时候,在死信目标队列确认收到这条死信消息之前,原来队列的消息是不会删除的,也就是说在某些异常场景下例如broker突然shutdown,是有机会存在说一个消息既存在于原队列,又存在于死信目标队列。具体可参考官方说明:

Dead-lettered messages are re-published with publisher confirms turned on internally so, the “dead-letter queues” (DLX routing targets) the messages eventually land on must confirm the messages before they are removed from the original queue. In other words, the “publishing” (the one in which messages expired) queue will not remove messages before the dead-letter queues acknowledge receiving them (see Confirms for details on the guarantees made). Note that, in the event of an unclean broker shutdown, the same message may be duplicated on both the original queue and on the dead-lettering destination queues.

Time-To-Live(TTL)

开头我们说过,实现延迟队列除了用死信消息外,还需要利用消息过期的TTL机制,因为只要消息过期了,就会触发死信。

RabbitMQ有两种方法让设置消息的TTL:

直接在消息上设置

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

为队列设置消息过期TTL

RabbitMQ实现延迟队列

注意,队列还有一个队列TTL,x-expires,这个的意思是队列空置经过一段时间(没有消费者,没有被重新声明,没有人在上面获取消息(basic.get))后,整个队列便会过期删除,不要混淆

如果同时设置了消息的过期和队列消息过期属性,则取两个较小值。

设计延迟队列:

例如,我们需要触发一个推送新闻,30分钟后统计这个新闻的下发情况,我们就需要一个延迟队列,新闻推送后,往延迟队列发送一个消息,这个队列的消息在30分钟后被消费,这时候触发即可统计30分钟的下发情况。我们可以这样设计:

定义一个正常的队列: ARRIVAL_STAT,统计程序监听此队列,进行消费。

定义一个“延迟队列”(RabbitMQ没有这样的队列,这里只是人为的制造一个这样的队列):DELAY_ARRIVAL_STAT,其中设置好对应的x-dead-letter-exchange,x-dead-letter-routing-key。为了简单说明,我使用默认的exchange,那么配置如下:

x-dead-letter-exchange=“”
x-dead-letter-routing-key=“ARRIVAL_STAT”

意思是,消息当这个队列DELAY_ARRIVAL_STAT的消息变死信之后,就会带着routing key “ARRIVAL_STAT”发送默认的空exchange,即队列ARRIVAL_STAT。

并且这个队列不能有消费者消费消息。

这样我们就实现了消息的死信转发。下一步,只需要让消息在这个DELAY_ARRIVAL_STAT在30分钟后过期变死信即可。按照上文所说,有两种方法,我们可以为队列的消息设置30分钟TTL,或者发送消息的时候指定消息的TTL为30分钟即可。

示例如下:

RabbitMQ实现延迟队列

“延迟队列”的堵塞缺陷

由于设置了x-dead-letter-exchange的队列本身也是普通队列,其过期的顺序是按照队列头部顺序的过期的。也就是说,如果你队列头的消息A过期时间是5分钟,后面对这个队列发送消息B的带着过期时间1分钟,那么后面的队列B要等队列A过期了才会触发过期:

Queues that had a per-message TTL applied to them retroactively (when they already had messages) will discard the messages when specific events occur. Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).

所以,对于此类多延迟时间的,可以考虑设置多级延迟队列。例如1分钟,5分钟,10分钟,20分钟这样多级的延迟队列,使得延迟相近的尽量放到同一个队列中减少拥堵的最坏情况。

RabbitMQ实现延迟队列


以上所述就是小编给大家介绍的《RabbitMQ实现延迟队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据结构与问题求解

数据结构与问题求解

韦斯 / 清华大学出版社 / 2011-8 / 89.50元

《数据结构与问题求解(Java语言版)(第4版)》是专为计算机科学专业的两个学期课程而设计的,从介绍什么足数据结构开始,继而对高级数据结构与算法进行分析。《数据结构与问题求解(Java语言版)(第4版)》以独特的方式,清晰地将每种数据结构的接口与其实现分离开来,即将如何使用数据结构与如何对数据结构编程相分离。《数据结构与问题求解(Java语言版)(第4版)》从抽象思维和问题求解的角度出发,为数据结......一起来看看 《数据结构与问题求解》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具