RabbitMQ:消息发送确认 与 消息接收确认(ACK)
接下来我们讲讲持久化中消息的回调,其实就是消息确认(生产者推送信息成功,消费者接收信息成功)
- 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除
- 默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除
开启消息确认机制
- Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
- consumer 进行 Basic.Ack 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
- 在自动 ack 的情况下,RabbitMQ 认为 message 一旦被发送 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
- 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
- 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
设置手动 ack 使得我们开发人员更加灵活的来处理我们的业务逻辑代码,更加方便的处理异常的问题以及数据的处理等
几种消息确认模式:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
默认情况下消息消费者是自动 ack 消息的,以下为 yml 配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
或者在 RabbitListenerContainerFactory 中开启手动 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//开启手动 ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
确认消息接收示例
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "rabbit.mq.test", durable = "true"),
exchange = @Exchange(
value = "rabbit_test_exchange",
durable = "true",
type = ExchangeTypes.TOPIC),
key = "rabbit.test.*")})
public void receive(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("消费信息:" + msg);
// 确认消息
channel.basicAck(deliveryTag, false);
}
常用的消息接收确认 (消费者)
channel.basicAck() — 确认收到的一条或多条信息。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息。
channel.basicNack() — 拒绝一个或多个接收到的消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
可以 nack 该消费者先前接收未 ack 的所有消息。nack 后的消息也会被自己消费到。
channel.basicReject() — 拒绝一条消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于 basicNack 可以拒绝多条消息,而 basicReject 一次只能拒绝一条消息
该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
channel.basicRecover() — 补发操作
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
requeue:如果为true,则消息将重新排队,并且可能交付给其他消费者。如果为false,则消息将为重新交付给同一消费者。
该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
还有部分 Channel 的方法例如 channel.exchangeDeclare() chanel.basicQos() channel.basicPublish() 因为消息确认不用上,就不做过多说明。
消息发送确认 (生产者)
- 当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功。
- 当消息发送出去之后,我们如何知道消息有没有正确到达 exchange 呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了 为解决这个问题,主要有如下两种方案:
- 通过事务机制实现 — 但是使用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,所以不推荐
- 通过生产者消息确认机制 (publisher confirm) 实现
开启确认消息发送配置,
server:
port: 8080
spring:
application:
name: rabbitmq
# 配置 rabbitMQ 服务器
rabbitmq:
host: 192.168.159.129
port: 5672
username: admin
password: admin
# 虚拟host 可以不设置,使用 server 默认 host
virtual-host:
listener:
simple:
# 手动 ack
acknowledge-mode: manual
# 消息确认配置项
# 开启发送至交换机(Exchange)的确认 目前这个版本的这个方式已经不推荐使用了,目前找到相似的为: publisher-confirm-type: CORRELATED
publisher-confirms: true
# 开启消息发送到队列(Queue)失败的回调
publisher-returns: true
或者在 ConnectionFactory 中进行配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://admin:admin@192.168.159.129:5672");
// 开启消息发送至 RabbitMQ 的回调
factory.setPublisherConfirms(true);
// 开启消息发送至队列失败的回调
factory.setPublisherReturns(true);
return factory;
}
配置 RabbitTemplate 中的两个回调内容
// 设置消息发送至 RabbitMQ 的回调
rabbitTemplate.setConfirmCallback()
// 设置消息发送至队列失败的回调
rabbitTemplate.setReturnCallback()
相关回调配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置消息从生产者发送至 rabbitmq broker 成功的回调 (保证信息到达 broker)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// ack=true:消息成功发送到Exchange
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认是否到达交换机:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
}
});
// 设置信息从交换机发送至 queue 失败的回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
}
});
// 为 true 时,消息通过交换器无法匹配到队列时会返回给生产者,为 false 时,匹配不到会直接丢弃
rabbitTemplate.setMandatory(true);
// 设置发送时的转换
// rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
- 消息发送的一个流程: producer -> rabbitmq broker cluster -> exchange -> queue -> consumer
- message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。
- message 从 exchange -> queue 投递失败则会返回一个 returnCallback 。
- 如果消息没有到 exchange,则 confirm 回调,ack = false
- 如果消息到达 exchange,则 confirm 回调,ack = true
- exchange 到 queue 成功,则不回调 return
- exchange 到 queue 失败,则回调 return (需设置 mandatory = true,否则不回回调,消息就丢了)
基于上述的回调配置,发送者回调示例:
- MQ:
- 交换机: rabbit_test_exchange (topic)
- 队列:rabbit.mq.test
- 路由键:rabbit.test.*
四种情况:
- 消息推送到 MQ,但是在 MQ 里找不到交换机
- 消息推送到 MQ,找到交换机了,当时没有找到队列
- 消息推送到 MQ,交换机和队列都没找到
- 消息成功推送
①.消息推送到 MQ,但是在 MQ 里找不到交换机
测试发送的交换机不存在
@Test
public void producer() {
String exchange = "non-existent-exchange";
String routingKey = "rabbit.test.key";
String msg = "hello";
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
结果:
2020-01-15 15:29:36.160 ERROR 29052 --- [68.159.129:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认是否到达交换机:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
结论: ①这种情况出发是 ConfirmCallback 回调函数
②.消息推送到 MQ,找到交换机了,当时没有找到队列
测试发送不对的路由键
@Test
public void producer() {
String exchange = "rabbit_test_exchange";
String routingKey = "no.key";
String msg = "hello";
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
结果:
ReturnCallback: 消息:(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:rabbit_test_exchange
ReturnCallback: 路由键:routing.key
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认是否到达交换机:true
ConfirmCallback: 原因:null
可以看到两个回调函数都被调用了,这种情况下,消息时成功推送到服务器至交换机的;
在 ReturnCallback 回调函数的打印参数可以看到, 消息推送到了交换机成功了,当时在路由分发给队列的时候,找不到队列,所以报了错误的 NO_ROUTE。
结论:②这种情况触发的是 ConfirmCallback 和 ReturnCallback 两个回调函数
③.消息推送到 MQ,交换机和队列都没找到
这种情况跟①很像,因为交换机都没有匹配到,还怎么匹配队列
结论:③这种情况触发的是 ConfirmCallback 回调函数
④.消息成功推送
@Test
public void producer() {
String exchange = "rabbit_test_exchange";
String routingKey = "rabbit.test.key";
String msg = "hello";
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
结果:
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认是否到达交换机:true
ConfirmCallback: 原因:null
消费信息:hello
结论: ④这种情况触发的是 ConfirmCallback 回调函数。
以上是生产者推送消息的消息确认 回调函数的使用介绍(可以在回调函数根据需求做对应的扩展或者业务数据处理)
关于消费者确认和发布者确认的一丢丢介绍就是这些了,想更深入了解更多的消费者确认和发布者确认可以查看官网对消息确认的介绍: Consumer Acknowledgements and Publisher Confirms
为您推荐与 rabbitmq 相关的帖子:
- RabbitMQ 学习笔记 -- 01 简介
- RabbitMQ 学习笔记 -- 02 一个 HelloWorld
- RabbitMQ 学习笔记 -- 03 多消费者
- RabbitMQ 学习笔记 -- 04 扇形交换机
- RabbitMQ 学习笔记 -- 05 路由模式
- RabbitMQ 学习笔记 -- 06 Topic 交换机
- RabbitMQ 学习笔记 -- 07 初探@RabbitListener
- RabbitMQ 学习笔记 -- 08 RabbitTemplate 及消息序列化
- RabbitMQ 学习笔记 -- 09 RabbitMQ 的持久化
- RabbitMQ 学习笔记 -- 11 RabbitMQ 死信队列
- RabbitMQ 消息队列模型使用介绍
- RabbitMQ 3.11.0 已发布
- RabbitMQ 3.11.6 发布
- RabbitMQ 3.11.8 已发布,AMQP 开源实现
- RabbitMQ 3.11.13 已发布,AMQP 开源实现
- RabbitMQ 3.11.14 已发布,AMQP 开源实现