Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

栏目: Java · 发布时间: 5年前

内容简介:之前我们已经通过准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

之前我们已经通过 《Spring Cloud Stream消费失败后的处理策略(一):自动重试》 一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {

        private int count = 1;

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload + ", " + count);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问 localhost:8080/sendMessage?message=hello 接口来发送一个消息到MQ中了,此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置: spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true 。在该配置作用之下,消息消费失败之后,并不会将该消息抛弃,而是将消息重新放入队列,所以消息的消费逻辑会被重复执行,直到这条消息消费成功为止。

深入思考

在完成了上面的这个例子之后,可能读者会有下面两个常见问题:

问题一:之前介绍的Spring Cloud Stream默认提供的默认功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列实现的重试有什么区别?

Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试,它们的处理逻辑是由同一条消息触发的。而本文所介绍的重新入队史通过重新将消息放入队列而触发的,所以实际上是收到了多次消息而实现的重试。

问题二:如上面的例子那样,消费一直不成功,这些不成功的消息会被不断堆积起来,如何解决这个问题?

对于这个问题,我们可以联合前文介绍的DLQ队列来完善消息的异常处理。

我们只需要增加如下配置,自动绑定dlq队列:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

然后改造一下消息处理程序,可以根据业务情况,为进入dlq队列增加一个条件,比如下面的例子:

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
    log.info("Received payload : " + payload + ", " + count);
    if (count == 3) {
        count = 1;
        throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
    } else {
        count ++;
        throw new RuntimeException("Message consumer failed!");
    }
}

设定了计数器count,当count为3的时候抛出 AmqpRejectAndDontRequeueException 这个特定的异常。此时,当只有当抛出这个异常的时候,才会将消息放入DLQ队列,从而不会造成严重的堆积问题。

代码示例

本文示例读者可以通过查看下面仓库的中的 stream-exception-handler-4 项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!


以上所述就是小编给大家介绍的《Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

成为乔布斯

成为乔布斯

[美] 布伦特·施兰德、[美] 里克·特策利 / 陶亮 / 中信出版集团 / 2016-10 / 69.00元

本书描绘了一位多姿多彩的人物将与生俱来的激情与成熟的管理方式相结合,打造出史上最有价值、最受消费者追捧的公司,这本书将彻底改变我们看待乔布斯的方式。 本书推翻了关于史蒂夫·乔布斯的传说和陈词滥调,比如他是天才和混蛋的结合体,暴躁易怒、自私自利,怠慢朋友与家人。本书揭示了这位苹果联合创始人和CEO的家庭生活与职业生涯,并回答了一个关键问题:为什么如此轻狂傲慢、以至于被赶出苹果的年轻人能成为史上......一起来看看 《成为乔布斯》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器