一、消息中间件的常见使用场景
- 复杂系统的解耦: 多个系统间通过中间件进行数据交互, 避免牵一发而动全身, 减少耦合, 提升系统稳定性与可扩展性
- 复杂链路的异步调用: 某些业务场景可以通过异步执行减少同步调用的时间, 从而大大提高系统响应时间而不影响核心逻辑
- 瞬时高峰的削峰处理: 流量高峰期, 可以将请求积压在MQ中, 服务器不用一下处理所有请求从而导致系统崩溃, 高峰期后, 消费者可以慢慢消费
二、系统架构引入消息中间件后会有哪些缺点
- 系统可用性降低: 引入MQ,系统多了一个依赖。依赖如果出现问题,就会导致系统可用性降低。一旦引入中间件,就必须考虑这个中间件是如何部署的,如何保证高可用性
- 系统稳定性降低: 引入MQ, 可能由于网络故障、中间件故障、消费者异常等原因导致各种各样乱七八糟的问题产生, 从而使系统稳定性下降
- 分布式一致性问题: 多系统协同处理一个业务, 不能保证所有系统都正常处理, 有可能出现系统数据不一致的情况, 所以此时又需要使用可靠消息最终一致性的分布式事务方案来保障数据一致性。
RabbitMQ 持久化操作
1.消息的持久化
消息被投递到RabbitMQ的内存中, 还没投递到消费者实例之前宕机了, 消息不就丢失了?
可以进行消息持久化, 将 Exchange、queue 和 message 都持久化到硬盘, 这样, RabbitMQ 重启时, 会把持久化的Exchange、queue 和 message从硬盘重新加载出来, 重新投递消息
① Exchange 的持久化,声明交换机时指定持久化参数为 true 即可。声明时是默认持久化的
/**
* Construct a new durable, non-auto-delete Exchange with the provided name.
* @param name the name of the exchange.
*/
public AbstractExchange(String name) {
this(name, true, false);
}
/**
* Construct a new Exchange, given a name, durability flag, auto-delete flag.
* @param name 交换机的名称
* @param durable 是否持久化
* @param autoDelete 是否自动删除 判断服务器在不再使用该交换机时是否自动删除该交换机
*/
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, null);
}
如果交换机不设置持久化,那么在 RabbitMQ 服务器重启之后,相关的交换机元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换机中了。对于一个长期使用的交换机来说,建议将其设置为持久化的。
② queue 的持久化,声明队列时指定持久化参数为 true 即可。声明时是默认持久化的
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
/**
* Construct a new queue, given a name, durability, exclusive and auto-delete flags.
* @param name 队列名称
* @param durable 是否持久化
* @param exclusive 是否为专用的队列,true:一个队列只能有一个消费者
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, null);
}
如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。
③ message 的持久化,使用 convertAndSend 方式发送消息,消息默认是持久化的
public class MessageProperties implements Serializable {
......
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
......
}
也可以使用 MessagePostProcessor 来配置消息属性,生产者投递时, 指定 deliveryMode 为 MessageDeliveryMode.PERSISTENT 即可实现消息的持久化
@Override
public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
}
在保证了队列的持久化后,开启 Message 的持久化就能保证在其内部的持久化存储
2.消息发送确认
生产者发送消息, 先发送消息到 Exchange, 然后 Exchange 再路由到 Queue, 这中间就需要确认两个事情
- 确认消息是否成功发送到 Exchange
- 确认消息是否从Exchange成功路由到 Queue
spring 提供了两个回调函数来处理这两种消息发送确认 ConfirmCallback ReturnCallback
3.消息接收确认
RabbitMQ 默认 自动确认(ack) 消息被正确消费, 即消息投递到消费者后就自动确认消息被处理完毕, 并且会将该消息删除, 即使消费者意外宕机, 或者抛出异常, 如果消费者接收到消息, 还没处理完成就 down 掉或者抛出异常, 那么, 这条消息就丢失了
消息确认模式有:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
默认情况下消息消费者是自动 ack(确认) 消息的, 如果要手动 ack(确认), 则需要修改确认模式为 manual
持久问题:
- 将所有的消息都设置为持久化,但是这样会严重影响RabbitMQ的性能(随机)。
写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。 - 在持久化的消息正确存入 RabbitMQ 之后,还需要一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。
RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的 fsync 方法)的处理,可能仅仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ 服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
这里可以引入RabbitMQ的镜像队列机制,相当于配置了副本,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证RabbitMQ消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
扩展问题:unack消息的积压问题
什么叫 unack 消息的积压问题
简单来说就是消费者处理能力有限, 无法一下将 MQ 投递过来的所有消息消费完, 如果MQ推送消息过多, 比如可能有几千上万条消息积压在某个消费者实例内存中, 此时这些积压的消息就处于 unack 状态, 如果一直积压, 就有可能导致消费者服务实例内存溢出、内存消耗过大、甚至内存泄露
所以, RabbitMQ 是必须要考虑一下消费者服务的处理能力的。
如何解决?
RabbitMQ 基于一个(预处理数) prefetch count 来控制这个 unack message 的数量。
你可以通过 channel.basicQos(10) 这个方法来设置当前 channel 的 prefetch count。也可以通过配置文件设置: spring.rabbitmq.listener.simple.prefetch = 10
spring-rabbit 默认设置了 prefetch count 的数量为 250 个
举个例子,比如你要是设置为 10 的话,那么意味着当前这个 channel 里,unack message 的数量不能超过 10个,以此来避免消费者服务实例积压 unack message 过多。
这样的话,就意味着 RabbitMQ 正在投递到 channel 过程中的 unack message,以及消费者服务在处理中的 unack message,以及异步ack之后还没完成ack的unack message,所有这些 message 加起来,一个 channel 也不能超过10个。
如果你要简单粗浅的理解的话,也大致可以理解为这个 prefetch count 就代表了一个消费者服务同时最多可以获取多少个 message 来处理。
prefetch 就是预抓取的意思,就意味着你的消费者服务实例预抓取多少条 message 过来处理,但是最多只能同时处理这么多消息。
如果一个 channel 里的 unack message 超过了 prefetch count 指定的数量,此时 RabbitMQ 就会停止给这个channel 投递消息了,必须要等待已经投递过去的消息被处理完 ack 了,此时才能继续投递下一个消息。
设置多大合理?
RabbitMQ 官方给出的建议是 prefetch count 一般设置在100 - 300之间。也就是一个消费者服务最多接收到100 - 300个 message 来处理,允许处于 unack 状态。
这个状态下可以兼顾吞吐量也很高,同时也不容易造成内存溢出的问题。
为您推荐与 rabbitmq 相关的帖子:
- RabbitMQ 学习笔记 -- 01 简介
- RabbitMQ 学习笔记 -- 02 一个 HelloWorld
- RabbitMQ 学习笔记 -- 03 多消费者
- RabbitMQ 学习笔记 -- 04 扇形交换机
- RabbitMQ 学习笔记 -- 05 路由模式
- RabbitMQ 学习笔记 -- 06 Topic 交换机
- RabbitMQ 学习笔记 -- 07 初探@RabbitListener
- RabbitMQ 学习笔记 -- 08 RabbitTemplate 及消息序列化
- RabbitMQ 学习笔记 -- 10 RabbitMQ 消费者确认和发布者确认
- RabbitMQ 学习笔记 -- 11 RabbitMQ 死信队列
- RabbitMQ 消息队列模型使用介绍
- RabbitMQ 3.11.0 已发布
- RabbitMQ 3.11.6 发布
- RabbitMQ 3.11.8 已发布,AMQP 开源实现
- RabbitMQ 3.11.13 已发布,AMQP 开源实现
- RabbitMQ 3.11.14 已发布,AMQP 开源实现