- 发送端确认 生产者能知道自己的消息是否成功发送到交换机或者队列
- 消费端确认 消费者成功消费消息后,发送确认标识使消息从MQ中删除
如何判断消息发送成功或失败 ?
- 确认消息不能路由到任何队列时,确认发送失败
- 消息可以路由到队列时,当需要发送的队列都发送成功后,进行消息确认成功.对于持久化的队列,意味着已经写入磁盘,对于镜像队列,意味着所有镜像都接受成功.
消费端如何告知rabbitmq消息消费成功或失败?
- 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
- 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
- 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
发送端确认实例
- 添加配置
# 消息发送到交换器确认 spring.rabbitmq.publisher-confirms=true # 消息发送到队列确认 spring.rabbitmq.publisher-returns=true 复制代码
- 创建两个监听类分别实现RabbitTemplate的ConfirmCallback和ReturnCallback接口 实现ConfirmCallback接口,当消息发送到交换机的回调 实现ReturnCallback接口,当消息路由不到指定队列时回调
消息发送到交换机监听类
@Slf4j
@Component
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("Success... 消息成功发送到交换机! correlationData:{}", correlationData);
} else {
log.info("Fail... 消息发送到交换机失败! correlationData:{}", correlationData);
}
}
}
/**
* 消息未路由到队列监听类
* @author by peng
* @date in 2019-06-01 21:32
*/
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("Fail... message:{},从交换机exchange:{},以路由键routingKey:{}," +
"未找到匹配队列,replyCode:{},replyText:{}",
message, exchange, routingKey, replyCode, replyText);
}
}
复制代码
- 重新注入RabbitTemplate,并设置两个监听类
@Configuration
public class RabbitConfig {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
rabbitTemplate.setReturnCallback( new SendReturnCallback());
return rabbitTemplate;
}
}
复制代码
- 定义生产者和消费者
生产者
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendConfirmSuccess() {
String content = "Message sent to exist exchange!";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content);
System.out.println("########### SendConfirmSuccess : " + content);
}
public void sendConfirmError() {
String content = "Message sent to not exist exchange!";
this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content);
System.out.println("########### SendConfirmError : " + content);
}
public void sendReturn() {
String content = "Message sent to exist exchange! But no queue to routing to";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content);
System.out.println("########### SendWReturn : " + content);
}
}
// 消费者
@Component
@RabbitListener(queues = "existQueue")
public class Receiver {
@RabbitHandler
public void process(String message) {
System.out.println("########### Receiver Msg:" + message);
}
}
复制代码
- 测试类
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
@Autowired
private Sender sender;
@Test
public void sendConfirmSuccess() {
sender.sendConfirmSuccess();
结果:成功发送并消费了消息,并输出监听日志
########### SendConfirmSuccess : Message sent to exist exchange!
Success... 消息成功发送到交换机! correlationData:null
########### Receiver Msg:Message sent to exist exchange!
}
@Test
public void sendConfirmError() {
sender.sendConfirmError();
结果:消息发送失败,并输入监听日志
########### SendConfirmError : Message sent to not exist exchange!
Fail... 消息发送到交换机失败! correlationData:null
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40)
}
@Test
public void sendReturn() {
sender.sendReturn();
结果:消息发送到交换机,但路由不到队列(不存在队列匹配路由键)
########### SendWReturn : Message sent to exist exchange! But no queue to routing to
Success... 消息成功发送到交换机! correlationData:null
Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),从交换机exchange:directConfirmExchange,以路由键routingKey:not-exist,未找到匹配队列,replyCode:312,replyText:NO_ROUTE
}
}
复制代码
消费端确认
- 添加配置
# 消费者消息确认--手动 ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual 复制代码
- 消费者代码
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
@RabbitHandler
public void process(String content, Channel channel, Message message) {
try {
System.out.println("########### message:" + message);
// 业务处理成功后调用,消息会被确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 业务处理失败后调用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("########### Receiver Msg:" + content);
}
}
复制代码
以上所述就是小编给大家介绍的《SpringBoot 中使用RabbitMQ(二)消息确认》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RabbitMQ消息确认机制+延时队列
- RabbitMQ的消息确认机制那些事
- Fuchsia最新消息,确认支持Android应用
- 原 荐 RabbitMQ之消息确认机制(事务+Confirm)
- Springboot + Rabbitmq 用了消息确认机制,感觉掉坑里了
- 2.7 交易确认
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Pro JavaScript Techniques
John Resig / Apress / 2006-12-13 / USD 44.99
Pro JavaScript Techniques is the ultimate JavaScript book for the modern web developer. It provides everything you need to know about modern JavaScript, and shows what JavaScript can do for your web s......一起来看看 《Pro JavaScript Techniques》 这本书的介绍吧!