内容简介:接上文在上文简单提到了如何将消息进行可靠发送,因为
接上文 分布式事务:基于可靠消息服务 介绍了整体中间件的设计思路,有些内容没有展开。故此,本文详细讲解下如何将消息可靠发送到Rabbitmq。
在上文简单提到了如何将消息进行可靠发送,因为 shine-mq
是无缝集成 spring-boot-starter
的,所以rabbitmq的操作也是基于spring的 rabbitTemplate
来完成的。
rabbitTemplate
提供了 setConfirmCallback
方法,可以在消息发送到RabbitMQ交换器后,进行ack的回调。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { ... //消息能投入正确的消息队列,并持久化,返回的ack为true if (ack) { log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData); ... } ... }); 复制代码
在此之前还需要设置 CachingConnectionFactory
//设置生成者确认机制 rabbitConnectionFactory.setPublisherConfirms(true); 复制代码
如果还需要设置 setReturnCallback
(消息发送到RabbitMQ交换器,但无相应Exchange时的回调),那就还需要设置 rabbitTemplate
//使用return-callback时必须设置mandatory为true rabbitTemplate.setMandatory(true); 复制代码
熟悉Rabbitmq的同学可能知道,Rabbitmq有两种机制来实现消息的可靠发布。
- 通过事务机制,这个上篇文章分析过,在这个模式下,rabbitmq的效率很低,不适合。
-
Confirm模式,这个模式下会有三种方式,分别是:
rabbitTemplate
所以我们知道了 rabbitTemplate
提供的确认机制是一种异步机制,并不能同步的发现问题,也就是说在极端的网络条件下是会出现消息丢失的。
所以 shine-mq
通过增加一个 Coordinator
(协调者)来实现。 Coordinator
会保存2个状态,一个是prepare( 携带回查id
),这个状态在前文说过是用来保证上游服务的任务状态的。
而另一个状态ready,就是来保证消息的可靠投递。
首先 shine-mq
是使用 @DistributedTrans
来开启。在这个注解的切面里,先持久化ready状态。
@Around(value = "@annotation(trans)") public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable { ... try { EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId, coordinatorName, msgId); //将消息持久化 coordinator.setReady(msgId, checkBackId.toString(), message); rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null); rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null); if (flag) { rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE, MqConstant.DEAD_LETTER_ROUTEKEY, null, null); flag = false; } rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED); } catch (Exception e) { log.error("Message failed to be sent : ", e); throw e; } } 复制代码
然后在回调中删除该状态:
//消息发送到RabbitMQ交换器后接收ack回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (correlationData != null) { log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause); String msgId = correlationData.getId(); CorrelationDataExt ext = (CorrelationDataExt) correlationData; Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator()); //可以自定义实现的回调 coordinator.confirmCallback(correlationData, ack); //消息能投入正确的消息队列,并持久化,返回的ack为true if (ack) { log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData); //删除ready状态 coordinator.delStatus(msgId); } else { ... } } }); 复制代码
因为存储ready是在上游服务任务执行之后的,所以只要有超时的ready记录未被清理掉, daemon(守护线程)
只管捞起来进行重发就行,因为Mq的可靠性投递就已经要求下游服务是需要保证幂等性了。
最后还有个极端的情况,就是ready消息存储的时候因为网络抖动该消息丢失了,这时候也没有关系,因为有prepare状态会进行回查,该状态只有在ready存储后才会触发删除。
如果对你有帮助,那就帮忙点个星星把 ^.^
github地址: github.com/7le/shine-m…
Github 不要吝啬你的star ^.^更多精彩 戳我
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 分布式事务:基于可靠消息服务
- 基于可靠消息方案的分布式事务:Lottor使用
- Apache Hadoop(3)---可靠的分布式存储HDFS
- 如何做可靠的分布式锁,Redlock真的可行么
- 【MDCC 2016】Erlang之父Joe Armstrong:如何设计高可靠的分布式并行系统
- 【MDCC 2016】Erlang之父Joe Armstrong:如何设计高可靠的分布式并行系统
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Redis设计与实现
黄健宏 / 机械工业出版社 / 2014-6 / 79.00
【官方网站】 本书的官方网站 www.RedisBook.com 提供了书本试读、相关源码下载和勘误回报等服务,欢迎读者浏览和使用。 【编辑推荐】 系统而全面地描述了 Redis 内部运行机制 图示丰富,描述清晰,并给出大量参考信息,是NoSQL数据库开发人员案头必备 包括大部分Redis单机特征,以及所有多机特性 【读者评价】 这本书描述的知识点很丰富,......一起来看看 《Redis设计与实现》 这本书的介绍吧!