内容简介:priority queue 定义参考官方文档:
Docker With RabbitMQ
官方 Docker 镜像仓库地址
本地运行 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
访问可视化面板
- 地址: http://127.0.0.1:15672/
- 默认账号:guest
- 默认密码:guest
Spring Boot With RabbitMQ
Spring Boot 集成 RabbitMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
基本参数配置
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing 配置
/** * RabbitMQ 配置 */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * 定义优先级队列 */ @Bean Queue queue() { Map<String, Object> args= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * 定义交换器 */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
priority queue 定义参考官方文档: https://www.rabbitmq.com/priority.html
Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。
RabbitMQ Publisher
Spring Boot 相关配置
# 是否开启消息发送到交换器(Exchange)后触发回调 spring.rabbitmq.publisher-confirms=false # 是否开启消息发送到队列(Queue)后触发回调 spring.rabbitmq.publisher-returns=false # 消息发送失败重试相关配置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
发送消息
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 发送设置有优先级的消息 * * @param priority 优先级 */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }
RabbitMQ Consumer
Spring Boot 相关配置
# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # 最小线程数量 spring.rabbitmq.listener.simple.concurrency=10 # 最大线程数量 spring.rabbitmq.listener.simple.max-concurrency=10 # 每个消费者可能未完成的最大未确认消息数量 spring.rabbitmq.listener.simple.prefetch=1
消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch
设置为较小数值,让优先级越高的消息更快加入到消费者线程。
监听消息
@Slf4j @Component public class MessageListener { /** * 处理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }
番外补充
1、自定义消息发送确认的回调
- 配置如下:
# 开启消息发送到交换器(Exchange)后触发回调 spring.rabbitmq.publisher-confirms=true # 开启消息发送到队列(Queue)后触发回调 spring.rabbitmq.publisher-returns=true
- 自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一标识: {}", correlationData); log.info("确认状态: {}", ack); log.info("造成原因: {}", cause); } }
- 自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体: {}", message); log.info("回复编码: {}", replyCode); log.info("回复内容: {}", replyText); log.info("交换器: {}", exchange); log.info("路由键: {}", routingKey); } }
- 配置
rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
2、RabbitMQ Exchange 类型
以上所述就是小编给大家介绍的《Spring Boot RabbitMQ - 优先级队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Python中栈、队列和优先级队列的实现
- RabbitMQ之优先级消息队列
- 用Python实现数据结构之优先级队列
- 使用 Swift 实现基于堆的优先级队列
- 个推基于 Apache Pulsar 的优先级队列方案
- 如何确定需求的优先级?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。