原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练

栏目: 数据库 · 发布时间: 6年前

内容简介:前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下:这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。

缓存架构之借助消息中间件RabbitMQ实现 Redis 缓存实时更新实战演练

一、背景介绍

前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下: 原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练 我们要实现这部分功能,需要借助两个系统:

  • 广告管理系统:生产广告的地方
  • 缓存服务系统:消费广告的地方

这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?

通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。

二、核心功能介绍

1、广告管理系统

功能:生产广告,并将生产信息实时同步给RabbitMQ

1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>janino</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mail</artifactId>
    <version>1.4.7</version>
</dependency>

2)基本配置

@Configuration
public class RabbitConfig {

    public final static String queueName = "ad_queue";

}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)生产者消息确认机制

# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

4)发送消息

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private static Map<String, Integer> map = new ConcurrentHashMap<>();

    private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String routingKey, String content) {
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.setRoutingKey(routingKey);

        //这样我们就能知道,发送失败的是哪条消息了
        this.rabbitTemplate.correlationConvertAndSend(content, new CorrelationData(content));
//        this.rabbitTemplate.convertAndSend(routingKey, content);
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            /**
             * 我们这里仅通过打印日志、发送邮件来预警,并没有实现自动重试机制:
             * 1、将发送失败重新发送到一个队列中:fail-queue,然后可以定时对这些消息进行重发
             * 2、在本地定义一个缓存map对象,定时进行重发
             * 3、为了更安全,可以将所有发送的消息保存到db中,并设置一个状态(是否发送成功),定时扫描检查是否存在未成功发送的信息
             * 这块知识,我们后期讲"分布式事务"的时候,在深入讲解这块内容
             */
            emailLogger.error("send ack fail, cause = {}, correlationData = {}", cause, correlationData.getId());
        } else {
            System.out.println("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        emailLogger.error("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
        String str = new String(message.getBody());
        retrySend(str, 3);
    }

    private void retrySend(String content, int retryTime){
        if(map.containsKey(content)){
            int count = map.get(content);
            count++;
            map.put(content, count);
        } else {
            map.put(content, 1);
        }
        if(map.get(content) <= retryTime) {
            send(RabbitConfig.queueName, content);
        }
    }

}

2、缓存服务系统

功能:实时监听RabbitMQ,根据通知信息,拉取相应的广告,并刷入redis

1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>janino</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mail</artifactId>
    <version>1.4.7</version>
</dependency>

2)基本配置

@Configuration
public class RabbitConfig {

    public final static String queueName = "ad_queue";

    /**
     * 死信队列:
     */
    public final static String deadQueueName = "ad_dead_queue";
    public final static String deadRoutingKey = "ad_dead_routing_key";
    public final static String deadExchangeName = "ad_dead_exchange";

    /**
     * 死信队列 交换机标识符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列交换机绑定键标识符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    @Bean
    public Queue helloQueue() {
        //将普通队列绑定到私信交换机上
        Map<String, Object> args = new HashMap<>(2);
        args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
        args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
        Queue queue = new Queue(queueName, true, false, false, args);
        return queue;
    }

    /**
     * 死信队列:
     */

    @Bean
    public Queue deadQueue() {
        Queue queue = new Queue(deadQueueName, true);
        return queue;
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(deadExchangeName);
    }

    @Bean
    public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
    }

}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)消费者消息确认机制

# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

4)接收消息

@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class Receiver {

    private Logger logger = LoggerFactory.getLogger(Receiver.class);

    private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");

    @Resource
    UpdateRedisServiceImpl updateRedisService;

    @RabbitHandler
    public void process(String content, Channel channel, Message message) {

        logger.info("handle msg begin = {}", content);

        AdMessage adMessage = JSON.parseObject(content, AdMessage.class);
        Long id = adMessage.getId();

        int retryTimes = 0;
        while (retryTimes < 5) {
            //消费者做幂等处理(当然这只是对单台机器而言没有问题,如果是分布式集群环境,这种是不行的,后续我们会继续优化这块):防止相同类型的广告id更新问题
            synchronized (AdLock.cacheLock) {
                //更新redis数据:
                if(!updateRedisService.updateRedis(id)){
                    retryTimes++;
                }
            }
            break;
        }

        if (retryTimes >= 3) {
            //当有多次更新失败的时候,发送邮件通知:
            emailLogger.error("处理MQ[" + content + "]失败[" + retryTimes + "]次");
        }

        try {
            if (retryTimes >= 5) {
                //当有很多次更新失败的时候,丢弃这条消息或者发送到死信队列中
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            }else {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }

        } catch (Exception e){
            logger.error("消息确认失败", e);
        }

        logger.info("handle msg finished = {}", content);

    }

}

三、实战演练

代码仓库: https://gitee.com/jikeh/JiKeHCN-RELEASE.git

1、广告管理系统:生产者发送消息通知到RabbitMQ

场景分析:新建/更新广告的时候,消息发送是否正常 项目名 :spring-boot-ad

1)正常

管控台,检查RabbitMQ是否正常收到消息

2)异常

  • 消息重试机制

  • 发送预警邮件

2、广告缓存服务系统:消费者接收消息并刷新到redis

场景分析:新建/更新广告的时候,消息接收是否正常 项目名 :spring-boot-rabbitmq-reliability-redis

1)正常

  • 管控台,- 检查RabbitMQ是非正常消费
  • redis是否存在数据

2)异常

  • 消息重试机制

  • 发送预警邮件

  • 发送到死信队列

四、预告

这节课我们讲的异常处理不太完善,下次课我们将使用 延迟队列 来处理异常消息

延迟队列应用场景也是很广的,请持续关注,下次分享

更多内容,请关注:

头条号:极客慧

原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练

个人网站: 极客慧

更多资料分享,请入群讨论:375412858

原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C语言接口与实现

C语言接口与实现

David R. Hanson / 郭旭 / 人民邮电出版社 / 2011-9 / 75.00元

《C语言接口与实现:创建可重用软件的技术》概念清晰、实例详尽,是一本有关设计、实现和有效使用C语言库函数,掌握创建可重用C语言软件模块技术的参考指南。书中提供了大量实例,重在阐述如何用一种与语言无关的方法将接口设计实现独立出来,从而用一种基于接口的设计途径创建可重用的API。 《C语言接口与实现:创建可重用软件的技术》是所有C语言程序员不可多得的好书,也是所有希望掌握可重用软件模块技术的人员......一起来看看 《C语言接口与实现》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具