Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

栏目: Java · 发布时间: 5年前

内容简介:有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在下面通过编写一个简单的例子来具体体会一下这个属性的用法:

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
    if("1.0".equals(version)) {
        // Version 1.0
    }
    if("2.0".equals(version)) {
        // Version 2.0
    }
}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在 @StreamListener 注解中提供了一个不错的属性 condition ,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }

    }

    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

内容很简单,既包含了消息的生产,也包含了消息消费。在 /sendMessage 接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类 TestListener 中,对 TestTopic.INPUT 通道定义了两个 @StreamListener ,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的 version 值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问 localhost:8080/sendMessage?message=hello 接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

代码示例

本文示例读者可以通过查看下面仓库的中的 stream-content-route 项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Coming of Age in Second Life

Coming of Age in Second Life

Tom Boellstorff / Princeton University Press / 2008-04-21 / USD 29.95

The gap between the virtual and the physical, and its effect on the ideas of personhood and relationships, is the most interesting aspect of Boellstorff's analysis... Boellstorff's portrayal of a virt......一起来看看 《Coming of Age in Second Life》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具