RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

栏目: 后端 · 发布时间: 5年前

1. 前情回顾

RabbitMQ使用教程(一)RabbitMQ环境安装配置及Hello World示例

RabbitMQ使用教程(二)RabbitMQ用户管理,角色管理及权限设置

RabbitMQ使用教程(三)如何保证消息99.99%被发送成功?

RabbitMQ使用教程(四)如何通过持久化保证消息99.99%不丢失?

截止目前,我们能够保证消息成功地被生产者发送到RabbitMQ服务器,也能保证RabbitMQ服务器发生异常(重启,宕机等)后消息不会丢失,也许你认为现在消息应该很安全了吧?其实还不够安全,不信你接着往下看。

2. 本篇概要

其实,还有1种场景需要考虑: 当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了? ,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。

那么如何解决这种问题呢?

为了保证消息被消费者成功的消费,RabbitMQ提供了 消息确认机制(message acknowledgement) ,本文主要讲解RabbitMQ中,如何使用消息确认机制来保证消息被消费者成功的消费,避免因为消费者突然宕机而引起的消息丢失。

3. 开启显式Ack模式

在第1篇博客 RabbitMQ使用教程(一)RabbitMQ环境安装配置及Hello World示例 中,我们开启一个消费者的代码是这样的:

// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

这里的重点是 channel.basicConsume(QUEUE_NAME, true, consumer); 方法的第2个参数,让我们先看下basicConsume()的源码:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}

这里的autoAck参数指的是是否自动确认,如果设置为ture,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者接收到消息是否处理成功;如果设置为false,RabbitMQ会等待消费者显式的回复确认信号后才会从内存(或者磁盘)中删除。

建议将autoAck设置为false,这样消费者就有足够的时间处理消息,不用担心处理消息过程中消费者宕机造成消息丢失。

此时,队列里的消息就分成了2个部分:

  1. 等待投递给消费者的消息(下图中的Ready部分)
  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息(下图中的Unacked部分)

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

为了便于理解,我们举个具体的例子,生产者的话的我们延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 发送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

然后新建一个消费者AckConsumer类:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建队列消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我们先将autoAck参数设置为ture,即自动确认,并在消费消息时故意写个异常,然后先运行生产者客户端将消息写入队列中,然后运行消费者客户端,发现消息未消费成功但是却消失了:

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

然后我们将autoAck设置为false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次运行生产者客户端将消息写入队列中,然后运行消费者客户端,此时虽然消费者客户端仍然代码异常,但是消息仍然在队列中:

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

然后我们删除掉消费者客户端中的异常代码,重新启动消费者客户端,发现消息消费成功了,但是消息一直未Ack:

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

手动停掉消费者客户端,发现消息又到了Ready状态,准备重新投递:

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

之所以消费掉消息,却一直还是Unacked状态,是因为我们没在代码中添加显式的Ack代码:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的编号,它是一个64位的长整形值。

此时运行消费者客户端,发现消息消费成功,并且在队列中被移除:

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

4. 源码

源码地址: https://github.com/zwwhnly/springboot-action.git ,欢迎下载。

5. 参考

《RabbitMQ实战指南》


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java核心技术·卷 I(原书第10版)

Java核心技术·卷 I(原书第10版)

[美] 凯.S.霍斯特曼(Cay S. Horstmann) / 周立新 等 / 机械工业出版社 / 2016-9 / CNY 119.00

Java领域最有影响力和价值的著作之一,由拥有20多年教学与研究经验的资深Java技术专家撰写(获Jolt大奖),与《Java编程思想》齐名,10余年全球畅销不衰,广受好评。第10版根据Java SE 8全面更新,同时修正了第9版中的不足,系统全面讲解了Java语言的核 心概念、语法、重要特性和开发方法,包含大量案例,实践性强。 一直以来,《Java核心技术》都被认为是面向高级程序员的经典教......一起来看看 《Java核心技术·卷 I(原书第10版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码