Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

栏目: Java · 发布时间: 5年前

内容简介:有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在下面通过编写一个简单的例子来具体体会一下这个属性的用法:

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
    if("1.0".equals(version)) {
        // Version 1.0
    }
    if("2.0".equals(version)) {
        // Version 2.0
    }
}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在 @StreamListener 注解中提供了一个不错的属性 condition ,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }

    }

    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

内容很简单,既包含了消息的生产,也包含了消息消费。在 /sendMessage 接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类 TestListener 中,对 TestTopic.INPUT 通道定义了两个 @StreamListener ,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的 version 值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问 localhost:8080/sendMessage?message=hello 接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

代码示例

本文示例读者可以通过查看下面仓库的中的 stream-content-route 项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数学建模算法与应用

数学建模算法与应用

司守奎、孙玺菁 / 国防工业出版社 / 2011-8 / 49.00元

《数学建模算法与应用》主要内容简介:作者司守奎、孙玺菁根据多年数学建模竞赛辅导工作的经验编写《数学建模算法与应用》,涵盖了很多同类型书籍较少涉及的新算法和热点技术,主要内容包括时间序列、支持向量机、偏最小二乘面归分析、现代优化算法、数字图像处理、综合评价与决策方法、预测方法以及数学建模经典算法等内容。《数学建模算法与应用》系统全面,各章节相对独立。《数学建模算法与应用》所选案例具有代表性,注重从不......一起来看看 《数学建模算法与应用》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器