内容简介:在上一篇文章中主要简单的介绍了一下rabbitmq 的基本概念,包括exchange的主要类型以及每种类型分别表示什么含义。本篇文章主要结合自己的理解,解读一下rabbitmq 是如何保证消息不丢失的?如图所示: producer 发送消息到rabbitmq broker,然后有2个消费者consumer1,consumer2进行信息消费,针对这个简单的场景,我们心里难免会有一个疑问:作为producer,我怎么知道我的消息已经成功的发送到了broker 呢? 再一个,我怎么知道我发送的消息已经成功的被co
在上一篇文章中主要简单的介绍了一下rabbitmq 的基本概念,包括exchange的主要类型以及每种类型分别表示什么含义。本篇文章主要结合自己的理解,解读一下rabbitmq 是如何保证消息不丢失的?
二.rabbitmq 是如何保证消息发送时不被丢失的?
如图所示: producer 发送消息到rabbitmq broker,然后有2个消费者consumer1,consumer2进行信息消费,针对这个简单的场景,我们心里难免会有一个疑问:作为producer,我怎么知道我的消息已经成功的发送到了broker 呢? 再一个,我怎么知道我发送的消息已经成功的被consumer消费了呢?还有,如果消息发送到broker后,broker机器挂了怎么办,消息会丢失吗?下面就这些疑问,结合自己的理解一一进行解答.
1) 生产者消息确认机制:
生产者消息确认机制主要就是解决消息成功发送到rabbitmq broker 的问题,rabbitmq 提供了2种手段用来解决这个问题:
- 通过事务机制实现
- 通过发送方确认机制实现
事务机制:
rabbitmq 客户端channel API针对事务机制这块提供了3个方法:channel.txSelect,channel.txCommit,channel.txRollback .
/** * Enables TX mode on this channel. * @see com.rabbitmq.client.AMQP.Tx.Select * @see com.rabbitmq.client.AMQP.Tx.SelectOk * @return a transaction-selection method to indicate the transaction was successfully initiated * @throws java.io.IOException if an error is encountered */ Tx.SelectOk txSelect() throws IOException; /** * Commits a TX transaction on this channel. * @see com.rabbitmq.client.AMQP.Tx.Commit * @see com.rabbitmq.client.AMQP.Tx.CommitOk * @return a transaction-commit method to indicate the transaction was successfully committed * @throws java.io.IOException if an error is encountered */ Tx.CommitOk txCommit() throws IOException; /** * Rolls back a TX transaction on this channel. * @see com.rabbitmq.client.AMQP.Tx.Rollback * @see com.rabbitmq.client.AMQP.Tx.RollbackOk * @return a transaction-rollback method to indicate the transaction was successfully rolled back * @throws java.io.IOException if an error is encountered */ Tx.RollbackOk txRollback() throws IOException; 复制代码
txSelect方法主要是用于将信道(channel)设置成事务模式,txCommit 主要用于提交事务,txRollback 主要用于将事务进行回滚。在开启事务之后,我们便可以将消息发送给rabbitmq了,如果在执行tx.commit执行成功时,表示消息已经成功的发送到rabbitmq服务器了,反之则会抛异常。
需要说明的是,这里的消息已经成功的发送到rabbitmq服务器,指的是消息已经成功发送到rabbitmq 服务器的exchange 了,如果exchange 没有匹配消息绑定的队列,消息还是会丢失。 说明:rabbitmq 的事务与关系数据库如 mysql 的事务机制是不一样的,关系数据库事务关注的是ACID,rabbitmq关心的是消息是否成功发送。 复制代码
发送方确认机制
生产者将信道设置成confirm(确认)模式,一旦设置成confirm 模式,当消息投递到broker之后,rabbitmq 的broker 会给消息发送端发一条BASIC.ACK 的确认消息,发送端通过监听这个确认消息,可以知道信息是否已经成功的发送出去. rabbitmq 客户端Channel API 里也提供了相应的API channel.confirmSelect 用来开启客户端确认模式:
/** * Enables publisher acknowledgements on this channel. * @see com.rabbitmq.client.AMQP.Confirm.Select * @throws java.io.IOException if an error is encountered */ Confirm.SelectOk confirmSelect() throws IOException; 复制代码
2种模式的比较:事务机制发送消息的过程是同步的,发送消息之后在rabbitmq 回应之前会阻塞,直到收到回应之后才能发送下一条消息,这样会降低系统的吞吐量。发送者确认机制是异步的,生产者在发送消息等待信道返回确认消息的时候继续发送下一条信息。所以相比而言,使用消息确认机制发送消息吞吐量会更高一些。
2)消费端确认机制
消费端确认机制主要是为了确保消息投递到消费者之后能够被成功的消费。 在rabbitmq 的Channel API 中也提供了相应的参数给业务侧进行控制,如下:
/** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; 复制代码
在baiscConusme里有一个参数:autoAck ,该参数为false 的时候表示消费端需要进行手动确认(比如调用channel.basicAck进行主动确认),如果消费者在消费完一条消息之后向broker 发送确认消息,然后由于网络原因或者其他原因导致broker 没有确认这条消息时,broker 不会删除这条消息,当连接重新建立之后,消费者还是会收到这条消息。
3)持久化机制
rabbitmq 的持久化机制主要是确保生产者发送的消息能成功的落盘,确保broker重启之后未被消费的信息不会被丢失。
rabbitmq 的持久化机制,主要从以下几个方面来保障:
- exchange 的持久化
- queue 的持久化
- message 的持久化
需要说明的是,消息是存储在queue里的,所以只有在queue设置为持久化的时候,message的持久化才有意义,否则如果queue是非持久化的,即便message是持久的,在broker重启之后信息还是会丢失。
rabbitmq 的Channel API 也提供了相应的参数来设置:
/** * Actively declare a non-autodelete exchange with no extra arguments * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @throws java.io.IOException if an error is encountered * @return a declaration-confirm method to indicate the exchange was successfully declared */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; 复制代码
exchangeDeclare 接口中的durable 参数用来设置exchange是否持久化,为true表示是持久化的,反之为false
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; 复制代码
queueDeclare 接口的durable 参数通exchange类似
/** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * Invocations of <code>Channel#basicPublish</code> will eventually block if a * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. * * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>. * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; 复制代码
消息的持久化通过消息的属性BasicProperties中的deliveryMode参数来设置,deliveryMode为2表示是持久化信息。
小结
通过消息发送端确认机制,消费端确认机制以及持久化,rabbitmq 保证了消息的可靠性。但是又有一个疑问出现了,如果仅仅部署一台broker, 即便是消息持久化了,如果broker 出故障了,没法恢复了,那消息不还是会丢失吗? 为了避免单点故障,提升rabbitmq的可用性,rabbitmq 支持集群部署,以及提供了镜像队列等机制来确保信息的可靠性的。 关于rabbitmq的集群,以及镜像队列等相关方面的知识,在下期的学习之后再进行分享。
以上所述就是小编给大家介绍的《rabbitmq 学习与实践分享(2)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- (全栈学习实践)四、docker搭建redis主从实践
- 深度学习最佳实践 原 荐
- 评书:《美团机器学习实践》
- 项目实践-Fragment学习(一)
- golang 容器的学习与实践
- Service Worker学习与实践
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。