- 发送端确认 生产者能知道自己的消息是否成功发送到交换机或者队列
- 消费端确认 消费者成功消费消息后,发送确认标识使消息从MQ中删除
如何判断消息发送成功或失败 ?
- 确认消息不能路由到任何队列时,确认发送失败
- 消息可以路由到队列时,当需要发送的队列都发送成功后,进行消息确认成功.对于持久化的队列,意味着已经写入磁盘,对于镜像队列,意味着所有镜像都接受成功.
消费端如何告知rabbitmq消息消费成功或失败?
- 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
- 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
- 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
发送端确认实例
- 添加配置
# 消息发送到交换器确认 spring.rabbitmq.publisher-confirms=true # 消息发送到队列确认 spring.rabbitmq.publisher-returns=true 复制代码
- 创建两个监听类分别实现RabbitTemplate的ConfirmCallback和ReturnCallback接口 实现ConfirmCallback接口,当消息发送到交换机的回调 实现ReturnCallback接口,当消息路由不到指定队列时回调
消息发送到交换机监听类 @Slf4j @Component public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("Success... 消息成功发送到交换机! correlationData:{}", correlationData); } else { log.info("Fail... 消息发送到交换机失败! correlationData:{}", correlationData); } } } /** * 消息未路由到队列监听类 * @author by peng * @date in 2019-06-01 21:32 */ @Slf4j @Component public class SendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("Fail... message:{},从交换机exchange:{},以路由键routingKey:{}," + "未找到匹配队列,replyCode:{},replyText:{}", message, exchange, routingKey, replyCode, replyText); } } 复制代码
- 重新注入RabbitTemplate,并设置两个监听类
@Configuration public class RabbitConfig { @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback( new SendConfirmCallback()); rabbitTemplate.setReturnCallback( new SendReturnCallback()); return rabbitTemplate; } } 复制代码
- 定义生产者和消费者
生产者 @Component public class Sender { @Autowired private RabbitTemplate rabbitTemplate; public void sendConfirmSuccess() { String content = "Message sent to exist exchange!"; this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content); System.out.println("########### SendConfirmSuccess : " + content); } public void sendConfirmError() { String content = "Message sent to not exist exchange!"; this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content); System.out.println("########### SendConfirmError : " + content); } public void sendReturn() { String content = "Message sent to exist exchange! But no queue to routing to"; this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content); System.out.println("########### SendWReturn : " + content); } } // 消费者 @Component @RabbitListener(queues = "existQueue") public class Receiver { @RabbitHandler public void process(String message) { System.out.println("########### Receiver Msg:" + message); } } 复制代码
- 测试类
@RunWith(value=SpringJUnit4ClassRunner.class) @SpringBootTest public class ConfirmTest { @Autowired private Sender sender; @Test public void sendConfirmSuccess() { sender.sendConfirmSuccess(); 结果:成功发送并消费了消息,并输出监听日志 ########### SendConfirmSuccess : Message sent to exist exchange! Success... 消息成功发送到交换机! correlationData:null ########### Receiver Msg:Message sent to exist exchange! } @Test public void sendConfirmError() { sender.sendConfirmError(); 结果:消息发送失败,并输入监听日志 ########### SendConfirmError : Message sent to not exist exchange! Fail... 消息发送到交换机失败! correlationData:null Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40) } @Test public void sendReturn() { sender.sendReturn(); 结果:消息发送到交换机,但路由不到队列(不存在队列匹配路由键) ########### SendWReturn : Message sent to exist exchange! But no queue to routing to Success... 消息成功发送到交换机! correlationData:null Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),从交换机exchange:directConfirmExchange,以路由键routingKey:not-exist,未找到匹配队列,replyCode:312,replyText:NO_ROUTE } } 复制代码
消费端确认
- 添加配置
# 消费者消息确认--手动 ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual 复制代码
- 消费者代码
@Component @RabbitListener(queues = "existQueue") public class AckReceiver { @RabbitHandler public void process(String content, Channel channel, Message message) { try { System.out.println("########### message:" + message); // 业务处理成功后调用,消息会被确认消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 业务处理失败后调用 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true); //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } catch (IOException e) { e.printStackTrace(); } System.out.println("########### Receiver Msg:" + content); } } 复制代码
以上所述就是小编给大家介绍的《SpringBoot 中使用RabbitMQ(二)消息确认》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RabbitMQ消息确认机制+延时队列
- RabbitMQ的消息确认机制那些事
- Fuchsia最新消息,确认支持Android应用
- 原 荐 RabbitMQ之消息确认机制(事务+Confirm)
- Springboot + Rabbitmq 用了消息确认机制,感觉掉坑里了
- 2.7 交易确认
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。