死信队列
DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。其实 DLX 就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ 会自动发送)。
什么是死信呢?什么样的消息会变成死信呢?
被设置了TTL的消息在过期后会成为 Dead Letter。其实在 RabbitMQ 中,一共有三种消息的“死亡”形式:
- 消息被拒绝(basic.reject或basic.nack)并且requeue = false.
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
死信处理过程
- DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中的消息做相应的处理。
应用场景分析:
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
定义业务(普通)队列的时候指定参数:
- x-dead-letter-exchange: 用来设置死信后发送的交换机
- x-dead-letter-routing-key:用来设置死信的 routingKey
死信队列设置
大致流程
web 控制台操作
1.声明一个死信交换机,
其实和普通的交换机都一样,同样作用于将消息推送至相应的队列,只不过它充当的角色是用来转发那些死信消息的交换机。
这里声明的为 Topic 交换机,名字为 — rabbit_dlx_exchange
2.声明死信交换机的处理队列,并绑定。
我们这里声明两个队列 ( queue_dlx_A,queue_dlx_B),并用两个不同的路由键 ( routing.a.#,routing.b.# ),绑定在死信交换机上,方便演示。
绑定状态
3.声明普通的工作交换机,一个正常工作的交换机,
这里我们声明 Topic 类型交换机,名字为 — rabbit_work_exchange
4.声明工作交换机对应的工作队列,并绑定在工作交换机上,设置路由键为 #
声明队列 work_queue
这里需要重点说明一下,设置 队列 work_queue 的队列参数
- x-dead-letter-exchange: 用来设置死信后发送的交换机,这里指明的为 rabbit_dlx_exchange 交换机
- x-dead-letter-routing-key:用来设置死信的 routingKey,如果没有设置 routingKey,则使用原来携带的路由键。这里使用 死信路由键 为 routing.a.key,是为了方便测试路由到死信交换机后,具体推送到哪个队列上。
5.在工作交换机 rabbit_work_exchange 中发送一条消息,消息传递到队列 work_queue 中
6,接下来在队列 work_queue 中拒绝该条消息
因为 web 控制台界面拒绝消息的方式只有两种,一个是 Nack message requeue true ,但是 requeue = true,消息会直接返回队列中,所以不行,这里选择的是 Reject requeue false。
7.结果
可以发现 work_queue 中的消息经过死信交换机被转发到了 queue_dlx_A 中,
代码方式
声明交换机,队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class Config {
// 声明死信交换机 rabbit_dlx_exchange
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange("rabbit_dlx_exchange");
}
@Bean
public Queue dlxQueueA() {
return new Queue("queue_dlx_A");
}
@Bean
public Queue dlxQueueB() {
return new Queue("queue_dlx_B");
}
@Bean
public Binding bindDlxAQueue() {
String routingKey = "routing.a.#";
return BindingBuilder.bind(dlxQueueA()).to(dlxExchange()).with(routingKey);
}
@Bean
public Binding bindDlxBQueue() {
String routingKey = "routing.b.#";
return BindingBuilder.bind(dlxQueueB()).to(dlxExchange()).with(routingKey);
}
// 声明正常工作的交换机 rabbit_work_exchange
@Bean
public TopicExchange workExchange() {
return new TopicExchange("rabbit_work_exchange");
}
@Bean
public Queue workQueue() {
String queueName = "work_queue";
// 要指定的死信交换机
String deadExchangeName = "rabbit_dlx_exchange";
// 路由键 这里模拟交给死信交换机下的 A 队列中
String deadRoutingKey = "routing.a.key";
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", deadExchangeName);
args.put("x-dead-letter-routing-key", deadRoutingKey);
return new Queue(queueName, true, false, false, args);
}
@Bean
public Binding bindWorkQueue() {
String routingKey = "#";
return BindingBuilder.bind(workQueue()).to(workExchange()).with(routingKey);
}
}
在正常工作队列 work_queue 的配置中注入了 Map 参数,用来配置
x-dead-letter-exchange 标识一个交换机
x-dead-letter-routing-key 来标识一个绑定键。
消费者
@Component
public class Consumer {
@RabbitListener(queues = "work_queue")
public void receiver(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("工作队列 work_queue 消费信息:" + msg);
System.out.println("将消息拒绝放回队列中");
channel.basicReject(deliveryTag, false);
// 两种方式都可以,但 requeue 必须为 false
// channel.basicNack(deliveryTag, false, false);
}
@RabbitListener(queues = "queue_dlx_A")
public void receiver2(String msg, Channel channel) throws IOException {
System.out.println("死信交换机下 queue_dlx_A 队列接收到消息:" + msg);
channel.basicAck(1, false);
}
@RabbitListener(queues = "queue_dlx_B")
public void receiver3(String msg, Channel channel) throws IOException {
System.out.println("死信交换机下 queue_dlx_B 队列接收到消息:" + msg);
channel.basicAck(1, false);
}
}
生产者
@Test
public void producer() {
String exchange = "rabbit_work_exchange";
String routingKey = "a";
String msg = "模拟一条死信消息";
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
输出
从队列 work_queue 接收到消息:模拟一条死信消息
消费者拒绝消息
死信交换机下 queue_dlx_A 队列接收到消息:模拟一条死信消息
死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当明白了这些之后,这些 Exchange 和 Queue 想怎样配合就能怎么配合。
比如从死信队列拉取消息,然后发送邮件、短信等做异步处理逻辑。
或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。
为您推荐与 rabbitmq 相关的帖子:
- RabbitMQ 学习笔记 -- 01 简介
- RabbitMQ 学习笔记 -- 02 一个 HelloWorld
- RabbitMQ 学习笔记 -- 03 多消费者
- RabbitMQ 学习笔记 -- 04 扇形交换机
- RabbitMQ 学习笔记 -- 05 路由模式
- RabbitMQ 学习笔记 -- 06 Topic 交换机
- RabbitMQ 学习笔记 -- 07 初探@RabbitListener
- RabbitMQ 学习笔记 -- 08 RabbitTemplate 及消息序列化
- RabbitMQ 学习笔记 -- 09 RabbitMQ 的持久化
- RabbitMQ 学习笔记 -- 10 RabbitMQ 消费者确认和发布者确认
- RabbitMQ 消息队列模型使用介绍
- RabbitMQ 3.11.0 已发布
- RabbitMQ 3.11.6 发布
- RabbitMQ 3.11.8 已发布,AMQP 开源实现
- RabbitMQ 3.11.13 已发布,AMQP 开源实现
- RabbitMQ 3.11.14 已发布,AMQP 开源实现