Spring Boot(十四)RabbitMQ延迟队列

栏目: 后端 · 发布时间: 7年前

内容简介:延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。

实现延迟队列的方式有两种:

  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能;

注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。

二、安装延迟插件

1.1 下载插件

打开官网下载: http://www.rabbitmq.com/community-plugins.html

选择相应的对应的版本“3.7.x”点击下载。

注意:下载的是.zip的安装包,下载完之后需要手动解压。

1.2 安装插件

拷贝插件到Docker:

docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins

RabbitMQ在 Docker 的安装,请参照本系列的上一篇文章。

1.3 启动插件

进入docker内部:

docker exec -it rabbit /bin/bash

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查询安装的所有插件:

rabbitmq-plugins list

安装正常,效果如下图:

Spring Boot(十四)RabbitMQ延迟队列

重启RabbitMQ,使插件生效

docker restart rabbit

三、代码实现

3.1 配置队列

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // 配置默认的交换机
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
    // 绑定队列到交换器
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}

3.2 发送消息

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送时间:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", 3000);
                return message;
            }
        });
    }
}

3.3 消费消息

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收时间:" + sdf.format(new Date()));
        System.out.println("消息内容:" + msg);
    }
}

3.4 测试队列

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void Test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }
}

执行结果如下:

发送时间:2018-09-11 20:47:51
接收时间:2018-09-11 20:47:54
消息内容:Hi Admin.

完整代码访问我的GitHub: https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

四、总结

到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。


以上所述就是小编给大家介绍的《Spring Boot(十四)RabbitMQ延迟队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Agile Web Application Development with Yii 1.1 and PHP5

Agile Web Application Development with Yii 1.1 and PHP5

Jeffrey Winesett / Packt Publishing / 2010-08-27

In order to understand the framework in the context of a real-world application, we need to build something that will more closely resemble the types of applications web developers actually have to bu......一起来看看 《Agile Web Application Development with Yii 1.1 and PHP5》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

SHA 加密
SHA 加密

SHA 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具