内容简介:延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。
延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。
实现延迟队列的方式有两种:
- 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
- 使用rabbitmq-delayed-message-exchange插件实现延迟功能;
注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。
由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。
二、安装延迟插件
1.1 下载插件
打开官网下载: http://www.rabbitmq.com/community-plugins.html
选择相应的对应的版本“3.7.x”点击下载。
注意:下载的是.zip的安装包,下载完之后需要手动解压。
1.2 安装插件
拷贝插件到Docker:
docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins
RabbitMQ在 Docker 的安装,请参照本系列的上一篇文章。
1.3 启动插件
进入docker内部:
docker exec -it rabbit /bin/bash
开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查询安装的所有插件:
rabbitmq-plugins list
安装正常,效果如下图:
重启RabbitMQ,使插件生效
docker restart rabbit
三、代码实现
3.1 配置队列
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
3.2 发送消息
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
3.3 消费消息
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
3.4 测试队列
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
执行结果如下:
发送时间:2018-09-11 20:47:51 接收时间:2018-09-11 20:47:54 消息内容:Hi Admin.
完整代码访问我的GitHub: https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
四、总结
到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。
以上所述就是小编给大家介绍的《Spring Boot(十四)RabbitMQ延迟队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RabbitMQ实现延迟队列
- 基于 Redis 实现的延迟消息队列
- 你真的知道怎么实现一个延迟队列吗?
- 【RabbitMQ】一文带你搞定RabbitMQ延迟队列
- 灵感来袭,基于Redis的分布式延迟队列
- Node.js结合RabbitMQ延迟队列实现定时任务
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Agile Web Application Development with Yii 1.1 and PHP5
Jeffrey Winesett / Packt Publishing / 2010-08-27
In order to understand the framework in the context of a real-world application, we need to build something that will more closely resemble the types of applications web developers actually have to bu......一起来看看 《Agile Web Application Development with Yii 1.1 and PHP5》 这本书的介绍吧!