分布式事务:消息可靠发送

栏目: 后端 · 发布时间: 5年前

内容简介:接上文在上文简单提到了如何将消息进行可靠发送,因为

接上文 分布式事务:基于可靠消息服务 介绍了整体中间件的设计思路,有些内容没有展开。故此,本文详细讲解下如何将消息可靠发送到Rabbitmq。

在上文简单提到了如何将消息进行可靠发送,因为 shine-mq 是无缝集成 spring-boot-starter 的,所以rabbitmq的操作也是基于spring的 rabbitTemplate 来完成的。

rabbitTemplate 提供了 setConfirmCallback 方法,可以在消息发送到RabbitMQ交换器后,进行ack的回调。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    ...
    //消息能投入正确的消息队列,并持久化,返回的ack为true
    if (ack) {
        log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
        ...
    }
    ...
});
复制代码

在此之前还需要设置 CachingConnectionFactory

//设置生成者确认机制
rabbitConnectionFactory.setPublisherConfirms(true);
复制代码

如果还需要设置 setReturnCallback (消息发送到RabbitMQ交换器,但无相应Exchange时的回调),那就还需要设置 rabbitTemplate

//使用return-callback时必须设置mandatory为true
rabbitTemplate.setMandatory(true);
复制代码

熟悉Rabbitmq的同学可能知道,Rabbitmq有两种机制来实现消息的可靠发布。

  • 通过事务机制,这个上篇文章分析过,在这个模式下,rabbitmq的效率很低,不适合。
  • Confirm模式,这个模式下会有三种方式,分别是:
    rabbitTemplate
    

所以我们知道了 rabbitTemplate 提供的确认机制是一种异步机制,并不能同步的发现问题,也就是说在极端的网络条件下是会出现消息丢失的。

所以 shine-mq 通过增加一个 Coordinator (协调者)来实现。 Coordinator 会保存2个状态,一个是prepare( 携带回查id ),这个状态在前文说过是用来保证上游服务的任务状态的。

而另一个状态ready,就是来保证消息的可靠投递。

首先 shine-mq 是使用 @DistributedTrans 来开启。在这个注解的切面里,先持久化ready状态。

@Around(value = "@annotation(trans)")
public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable {
    ...
    try {
        EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId,
                coordinatorName, msgId);
        //将消息持久化
        coordinator.setReady(msgId, checkBackId.toString(), message);
        rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null);
        rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null);
        if (flag) {
            rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE,
                    MqConstant.DEAD_LETTER_ROUTEKEY, null, null);
            flag = false;
        }
        rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED);
    } catch (Exception e) {
        log.error("Message failed to be sent : ", e);
        throw e;
    }
}
复制代码

然后在回调中删除该状态:

//消息发送到RabbitMQ交换器后接收ack回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (correlationData != null) {
        log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
        String msgId = correlationData.getId();
        CorrelationDataExt ext = (CorrelationDataExt) correlationData;
        Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
        //可以自定义实现的回调
        coordinator.confirmCallback(correlationData, ack);
        //消息能投入正确的消息队列,并持久化,返回的ack为true
        if (ack) {
            log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
            //删除ready状态
            coordinator.delStatus(msgId);
        } else {
            ...
        }
    }
});
复制代码

因为存储ready是在上游服务任务执行之后的,所以只要有超时的ready记录未被清理掉, daemon(守护线程) 只管捞起来进行重发就行,因为Mq的可靠性投递就已经要求下游服务是需要保证幂等性了。

最后还有个极端的情况,就是ready消息存储的时候因为网络抖动该消息丢失了,这时候也没有关系,因为有prepare状态会进行回查,该状态只有在ready存储后才会触发删除。

如果对你有帮助,那就帮忙点个星星把 ^.^

github地址: github.com/7le/shine-m…

Github 不要吝啬你的star ^.^更多精彩 戳我


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

法律论证理论

法律论证理论

罗伯特·阿列克西 / 舒国滢 / 中国法制出版社 / 2002-12-01 / 30.00

阿列克西的著作探讨的主要问题是如法律裁决之类的规范性陈述如何以理性的方式证立。阿列克西将规范性陈述的证立过程看作实践商谈或“实践言说”,而将法律裁决的证立过程视为“法律言说” 。由于支持法律规范的法律商谈是普遍实践言说的特定形式,所以法律论证理论应当立基于这种一般理论。 在阿列克西看来,如果裁决是理性言说的结果,那么这一规范性陈述就是真实的或可接受的。其基本观念在于法律裁决证立的合理性取决于......一起来看看 《法律论证理论》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具