Spring Boot RabbitMQ - 优先级队列

栏目: Java · 发布时间: 5年前

内容简介:priority queue 定义参考官方文档:

Spring Boot RabbitMQ - 优先级队列

Docker With RabbitMQ

官方 Docker 镜像仓库地址

本地运行 RabbitMQ

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

访问可视化面板

Spring Boot With RabbitMQ

Spring Boot 集成 RabbitMQ

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

基本参数配置

# host & port
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672

Queue / Exchange / Routing 配置

/**
 * RabbitMQ 配置
 */
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE = "priority-exchange";

    public static final String QUEUE = "priority-queue";

    private static final String ROUTING_KEY = "priority.queue.#";

    /**
     * 定义优先级队列
     */
    @Bean
    Queue queue() {
        Map<String, Object> args= new HashMap<>();
        args.put("x-max-priority", 100);
        return new Queue(QUEUE, false, false, false, args);
    }

    /**
     * 定义交换器
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

}

priority queue 定义参考官方文档: https://www.rabbitmq.com/priority.html

Spring Boot RabbitMQ - 优先级队列

Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。

RabbitMQ Publisher

Spring Boot 相关配置

# 是否开启消息发送到交换器(Exchange)后触发回调
spring.rabbitmq.publisher-confirms=false
# 是否开启消息发送到队列(Queue)后触发回调
spring.rabbitmq.publisher-returns=false
# 消息发送失败重试相关配置
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=3000ms
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=1

发送消息

@Component
@AllArgsConstructor
public class FileMessageSender {

    private static final String EXCHANGE = "priority-exchange";

    private static final String ROUTING_KEY_PREFIX = "priority.queue.";

    private final RabbitTemplate rabbitTemplate;

    /**
     * 发送设置有优先级的消息
     *
     * @param priority 优先级
     */
    public void sendPriorityMessage(String content, Integer priority) {
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
                message -> {
                    message.getMessageProperties().setPriority(priority);
                    return message;
                });
    }

}

RabbitMQ Consumer

Spring Boot 相关配置

# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
# 最小线程数量
spring.rabbitmq.listener.simple.concurrency=10
# 最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10
# 每个消费者可能未完成的最大未确认消息数量
spring.rabbitmq.listener.simple.prefetch=1

消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。

监听消息

@Slf4j
@Component
public class MessageListener {

    /**
     * 处理消息
     */
    @RabbitListener(queues = "priority-queue")
    public void listen(String message) {
        log.info(message);
    }

}

番外补充

1、自定义消息发送确认的回调

  • 配置如下:
# 开启消息发送到交换器(Exchange)后触发回调
spring.rabbitmq.publisher-confirms=true
# 开启消息发送到队列(Queue)后触发回调
spring.rabbitmq.publisher-returns=true
  • 自定义 RabbitTemplate.ConfirmCallback 实现类
@Slf4j
public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息唯一标识: {}", correlationData);
        log.info("确认状态: {}", ack);
        log.info("造成原因: {}", cause);
    }

}
  • 自定义 RabbitTemplate.ConfirmCallback 实现类
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息主体: {}", message);
        log.info("回复编码: {}", replyCode);
        log.info("回复内容: {}", replyText);
        log.info("交换器: {}", exchange);
        log.info("路由键: {}", routingKey);
    }

}
  • 配置 rabbitTemplate
@Component
@AllArgsConstructor
public class RabbitTemplateInitializingBean implements InitializingBean {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() {
        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
    }
    
}

2、RabbitMQ Exchange 类型


以上所述就是小编给大家介绍的《Spring Boot RabbitMQ - 优先级队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法设计与分析导论

算法设计与分析导论

R.C.T.Lee (李家同)、S.S.Tseng、R.C.Chang、Y.T.Tsai / 王卫东 / 机械工业 / 2008-1 / 49.00元

本书在介绍算法时,重点介绍用干设计算法的策略.非常与众不同。书中介绍了剪枝搜索、分摊分析、随机算法、在线算法以及多项式近似方案等相对较新的思想和众多基于分摊分析新开发的算法,每个算法都与实例一起加以介绍,而且每个例子都利用图进行详细解释。此外,本书还提供了超过400幅图来帮助初学者理解。本书适合作为高等院校算法设计与分析课程的高年级本科生和低年级研究生的教材,也可供相美科技人员和专业人七参考使用。一起来看看 《算法设计与分析导论》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具