原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练

栏目: 数据库 · 发布时间: 7年前

内容简介:前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下:这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。

缓存架构之借助消息中间件RabbitMQ实现 Redis 缓存实时更新实战演练

一、背景介绍

前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下: 原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练 我们要实现这部分功能,需要借助两个系统:

  • 广告管理系统:生产广告的地方
  • 缓存服务系统:消费广告的地方

这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?

通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。

二、核心功能介绍

1、广告管理系统

功能:生产广告,并将生产信息实时同步给RabbitMQ

1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>janino</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mail</artifactId>
    <version>1.4.7</version>
</dependency>

2)基本配置

@Configuration
public class RabbitConfig {

    public final static String queueName = "ad_queue";

}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)生产者消息确认机制

# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

4)发送消息

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private static Map<String, Integer> map = new ConcurrentHashMap<>();

    private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String routingKey, String content) {
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.setRoutingKey(routingKey);

        //这样我们就能知道,发送失败的是哪条消息了
        this.rabbitTemplate.correlationConvertAndSend(content, new CorrelationData(content));
//        this.rabbitTemplate.convertAndSend(routingKey, content);
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            /**
             * 我们这里仅通过打印日志、发送邮件来预警,并没有实现自动重试机制:
             * 1、将发送失败重新发送到一个队列中:fail-queue,然后可以定时对这些消息进行重发
             * 2、在本地定义一个缓存map对象,定时进行重发
             * 3、为了更安全,可以将所有发送的消息保存到db中,并设置一个状态(是否发送成功),定时扫描检查是否存在未成功发送的信息
             * 这块知识,我们后期讲"分布式事务"的时候,在深入讲解这块内容
             */
            emailLogger.error("send ack fail, cause = {}, correlationData = {}", cause, correlationData.getId());
        } else {
            System.out.println("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        emailLogger.error("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
        String str = new String(message.getBody());
        retrySend(str, 3);
    }

    private void retrySend(String content, int retryTime){
        if(map.containsKey(content)){
            int count = map.get(content);
            count++;
            map.put(content, count);
        } else {
            map.put(content, 1);
        }
        if(map.get(content) <= retryTime) {
            send(RabbitConfig.queueName, content);
        }
    }

}

2、缓存服务系统

功能:实时监听RabbitMQ,根据通知信息,拉取相应的广告,并刷入redis

1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>janino</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mail</artifactId>
    <version>1.4.7</version>
</dependency>

2)基本配置

@Configuration
public class RabbitConfig {

    public final static String queueName = "ad_queue";

    /**
     * 死信队列:
     */
    public final static String deadQueueName = "ad_dead_queue";
    public final static String deadRoutingKey = "ad_dead_routing_key";
    public final static String deadExchangeName = "ad_dead_exchange";

    /**
     * 死信队列 交换机标识符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列交换机绑定键标识符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    @Bean
    public Queue helloQueue() {
        //将普通队列绑定到私信交换机上
        Map<String, Object> args = new HashMap<>(2);
        args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
        args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
        Queue queue = new Queue(queueName, true, false, false, args);
        return queue;
    }

    /**
     * 死信队列:
     */

    @Bean
    public Queue deadQueue() {
        Queue queue = new Queue(deadQueueName, true);
        return queue;
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(deadExchangeName);
    }

    @Bean
    public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
    }

}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)消费者消息确认机制

# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

4)接收消息

@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class Receiver {

    private Logger logger = LoggerFactory.getLogger(Receiver.class);

    private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");

    @Resource
    UpdateRedisServiceImpl updateRedisService;

    @RabbitHandler
    public void process(String content, Channel channel, Message message) {

        logger.info("handle msg begin = {}", content);

        AdMessage adMessage = JSON.parseObject(content, AdMessage.class);
        Long id = adMessage.getId();

        int retryTimes = 0;
        while (retryTimes < 5) {
            //消费者做幂等处理(当然这只是对单台机器而言没有问题,如果是分布式集群环境,这种是不行的,后续我们会继续优化这块):防止相同类型的广告id更新问题
            synchronized (AdLock.cacheLock) {
                //更新redis数据:
                if(!updateRedisService.updateRedis(id)){
                    retryTimes++;
                }
            }
            break;
        }

        if (retryTimes >= 3) {
            //当有多次更新失败的时候,发送邮件通知:
            emailLogger.error("处理MQ[" + content + "]失败[" + retryTimes + "]次");
        }

        try {
            if (retryTimes >= 5) {
                //当有很多次更新失败的时候,丢弃这条消息或者发送到死信队列中
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            }else {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }

        } catch (Exception e){
            logger.error("消息确认失败", e);
        }

        logger.info("handle msg finished = {}", content);

    }

}

三、实战演练

代码仓库: https://gitee.com/jikeh/JiKeHCN-RELEASE.git

1、广告管理系统:生产者发送消息通知到RabbitMQ

场景分析:新建/更新广告的时候,消息发送是否正常 项目名 :spring-boot-ad

1)正常

管控台,检查RabbitMQ是否正常收到消息

2)异常

  • 消息重试机制

  • 发送预警邮件

2、广告缓存服务系统:消费者接收消息并刷新到redis

场景分析:新建/更新广告的时候,消息接收是否正常 项目名 :spring-boot-rabbitmq-reliability-redis

1)正常

  • 管控台,- 检查RabbitMQ是非正常消费
  • redis是否存在数据

2)异常

  • 消息重试机制

  • 发送预警邮件

  • 发送到死信队列

四、预告

这节课我们讲的异常处理不太完善,下次课我们将使用 延迟队列 来处理异常消息

延迟队列应用场景也是很广的,请持续关注,下次分享

更多内容,请关注:

头条号:极客慧

原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练

个人网站: 极客慧

更多资料分享,请入群讨论:375412858

原 荐 缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

On LISP

On LISP

Paul Graham / Prentice Hall / 09 September, 1993 / $52.00

On Lisp is a comprehensive study of advanced Lisp techniques, with bottom-up programming as the unifying theme. It gives the first complete description of macros and macro applications. The book also ......一起来看看 《On LISP》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试