内容简介:priority queue 定义参考官方文档:
Docker With RabbitMQ
官方 Docker 镜像仓库地址
本地运行 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
访问可视化面板
- 地址: http://127.0.0.1:15672/
- 默认账号:guest
- 默认密码:guest
Spring Boot With RabbitMQ
Spring Boot 集成 RabbitMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
基本参数配置
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing 配置
/** * RabbitMQ 配置 */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * 定义优先级队列 */ @Bean Queue queue() { Map<String, Object> args= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * 定义交换器 */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
priority queue 定义参考官方文档: https://www.rabbitmq.com/priority.html
Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。
RabbitMQ Publisher
Spring Boot 相关配置
# 是否开启消息发送到交换器(Exchange)后触发回调 spring.rabbitmq.publisher-confirms=false # 是否开启消息发送到队列(Queue)后触发回调 spring.rabbitmq.publisher-returns=false # 消息发送失败重试相关配置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
发送消息
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 发送设置有优先级的消息 * * @param priority 优先级 */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }
RabbitMQ Consumer
Spring Boot 相关配置
# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # 最小线程数量 spring.rabbitmq.listener.simple.concurrency=10 # 最大线程数量 spring.rabbitmq.listener.simple.max-concurrency=10 # 每个消费者可能未完成的最大未确认消息数量 spring.rabbitmq.listener.simple.prefetch=1
消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch
设置为较小数值,让优先级越高的消息更快加入到消费者线程。
监听消息
@Slf4j @Component public class MessageListener { /** * 处理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }
番外补充
1、自定义消息发送确认的回调
- 配置如下:
# 开启消息发送到交换器(Exchange)后触发回调 spring.rabbitmq.publisher-confirms=true # 开启消息发送到队列(Queue)后触发回调 spring.rabbitmq.publisher-returns=true
- 自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一标识: {}", correlationData); log.info("确认状态: {}", ack); log.info("造成原因: {}", cause); } }
- 自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体: {}", message); log.info("回复编码: {}", replyCode); log.info("回复内容: {}", replyText); log.info("交换器: {}", exchange); log.info("路由键: {}", routingKey); } }
- 配置
rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
2、RabbitMQ Exchange 类型
以上所述就是小编给大家介绍的《Spring Boot RabbitMQ - 优先级队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Python中栈、队列和优先级队列的实现
- RabbitMQ之优先级消息队列
- 用Python实现数据结构之优先级队列
- 使用 Swift 实现基于堆的优先级队列
- 个推基于 Apache Pulsar 的优先级队列方案
- 如何确定需求的优先级?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java Servlet & JSP Cookbook
Bruce W. Perry / O'Reilly Media / 2003-12-1 / USD 49.99
With literally hundreds of examples and thousands of lines of code, the Java Servlet and JSP Cookbook yields tips and techniques that any Java web developer who uses JavaServer Pages or servlets will ......一起来看看 《Java Servlet & JSP Cookbook》 这本书的介绍吧!