内容简介:前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下:这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。
缓存架构之借助消息中间件RabbitMQ实现 Redis 缓存实时更新实战演练
一、背景介绍
前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下: 我们要实现这部分功能,需要借助两个系统:
- 广告管理系统:生产广告的地方
- 缓存服务系统:消费广告的地方
这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?
通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。
二、核心功能介绍
1、广告管理系统
功能:生产广告,并将生产信息实时同步给RabbitMQ
1)添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${spring-boot.version}</version> </dependency> <!-- 发送邮件需要的2个jar --> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.8</version> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> </dependency>
2)基本配置
@Configuration public class RabbitConfig { public final static String queueName = "ad_queue"; }
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3)生产者消息确认机制
# 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 spring.rabbitmq.publisher-returns=true
4)发送消息
@Component public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ private static Map<String, Integer> map = new ConcurrentHashMap<>(); private final Logger emailLogger = LoggerFactory.getLogger("emailLogger"); @Autowired private RabbitTemplate rabbitTemplate; public void send(String routingKey, String content) { this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.setRoutingKey(routingKey); //这样我们就能知道,发送失败的是哪条消息了 this.rabbitTemplate.correlationConvertAndSend(content, new CorrelationData(content)); // this.rabbitTemplate.convertAndSend(routingKey, content); } /** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { /** * 我们这里仅通过打印日志、发送邮件来预警,并没有实现自动重试机制: * 1、将发送失败重新发送到一个队列中:fail-queue,然后可以定时对这些消息进行重发 * 2、在本地定义一个缓存map对象,定时进行重发 * 3、为了更安全,可以将所有发送的消息保存到db中,并设置一个状态(是否发送成功),定时扫描检查是否存在未成功发送的信息 * 这块知识,我们后期讲"分布式事务"的时候,在深入讲解这块内容 */ emailLogger.error("send ack fail, cause = {}, correlationData = {}", cause, correlationData.getId()); } else { System.out.println("send ack success"); } } /** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { emailLogger.error("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); String str = new String(message.getBody()); retrySend(str, 3); } private void retrySend(String content, int retryTime){ if(map.containsKey(content)){ int count = map.get(content); count++; map.put(content, count); } else { map.put(content, 1); } if(map.get(content) <= retryTime) { send(RabbitConfig.queueName, content); } } }
2、缓存服务系统
功能:实时监听RabbitMQ,根据通知信息,拉取相应的广告,并刷入redis
1)添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${spring-boot.version}</version> </dependency> <!-- 发送邮件需要的2个jar --> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.8</version> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> </dependency>
2)基本配置
@Configuration public class RabbitConfig { public final static String queueName = "ad_queue"; /** * 死信队列: */ public final static String deadQueueName = "ad_dead_queue"; public final static String deadRoutingKey = "ad_dead_routing_key"; public final static String deadExchangeName = "ad_dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; @Bean public Queue helloQueue() { //将普通队列绑定到私信交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(queueName, true, false, false, args); return queue; } /** * 死信队列: */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3)消费者消息确认机制
# 开启ACK spring.rabbitmq.listener.simple.acknowledge-mode=manual
4)接收消息
@Component @RabbitListener(queues = RabbitConfig.queueName) public class Receiver { private Logger logger = LoggerFactory.getLogger(Receiver.class); private final Logger emailLogger = LoggerFactory.getLogger("emailLogger"); @Resource UpdateRedisServiceImpl updateRedisService; @RabbitHandler public void process(String content, Channel channel, Message message) { logger.info("handle msg begin = {}", content); AdMessage adMessage = JSON.parseObject(content, AdMessage.class); Long id = adMessage.getId(); int retryTimes = 0; while (retryTimes < 5) { //消费者做幂等处理(当然这只是对单台机器而言没有问题,如果是分布式集群环境,这种是不行的,后续我们会继续优化这块):防止相同类型的广告id更新问题 synchronized (AdLock.cacheLock) { //更新redis数据: if(!updateRedisService.updateRedis(id)){ retryTimes++; } } break; } if (retryTimes >= 3) { //当有多次更新失败的时候,发送邮件通知: emailLogger.error("处理MQ[" + content + "]失败[" + retryTimes + "]次"); } try { if (retryTimes >= 5) { //当有很多次更新失败的时候,丢弃这条消息或者发送到死信队列中 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); }else { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } } catch (Exception e){ logger.error("消息确认失败", e); } logger.info("handle msg finished = {}", content); } }
三、实战演练
代码仓库: https://gitee.com/jikeh/JiKeHCN-RELEASE.git
1、广告管理系统:生产者发送消息通知到RabbitMQ
场景分析:新建/更新广告的时候,消息发送是否正常 项目名 :spring-boot-ad
1)正常
管控台,检查RabbitMQ是否正常收到消息
2)异常
-
消息重试机制
-
发送预警邮件
2、广告缓存服务系统:消费者接收消息并刷新到redis
场景分析:新建/更新广告的时候,消息接收是否正常 项目名 :spring-boot-rabbitmq-reliability-redis
1)正常
- 管控台,- 检查RabbitMQ是非正常消费
- redis是否存在数据
2)异常
-
消息重试机制
-
发送预警邮件
-
发送到死信队列
四、预告
这节课我们讲的异常处理不太完善,下次课我们将使用 延迟队列 来处理异常消息
延迟队列应用场景也是很广的,请持续关注,下次分享
更多内容,请关注:
头条号:极客慧
个人网站: 极客慧
更多资料分享,请入群讨论:375412858
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- DuiC 配置中心 1.7.0 发布,支持配置实时更新
- Flink 使用 broadcast 实现维表或配置的实时更新
- 亿级视频内容如何实时更新?(附阿里文娱智能算法PDF下载)
- probie 插件更新,新增实时编程
- vue实现歌手列表字母排序,下拉滚动条侧栏排序实时更新
- 配置中心 duic-java-client 1.3.0 发布,配置实时更新
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。