内容简介:让我们展示如何使用
让我们展示如何使用 Spring Cloud Stream 来设计 事件驱动的微服务 。首先,Spring Cloud Stream首先有什么好处?因为 Spring AMPQ 提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此 repo ,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?
Spring Cloud Stream概念
- Spring Cloud Stream通过 Binder 概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
- Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的 Spring Integration 代码也将是相同的!
示例中的Spring Cloud Stream概念(RabbitMQ)
让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建 SimpleMessageListenerContainer 并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:
spring.cloud.stream.bindings.citiesChannel.destination=streamInput spring.cloud.stream.bindings.citiesChannel.group=cities spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=<b>true</b> spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.destination=streamInput spring.cloud.stream.bindings.personsChannel.group=persons spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=<b>true</b> spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
配置详细信息
在类路径上使用RabbitMQ Binder,每个目标都映射到 TopicExchange 。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。
spring.cloud.stream.bindings.citiesChannel.destination = streamInput spring.cloud.stream.bindings.personsChannel.destination = streamInput
现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。
因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道
# This will create queue <font>"streamInput.cities"</font><font> connected to message channel citiesChannel where input messages will land. spring.cloud.stream.bindings.citiesChannel.group=cities # Durable subscription, of course. spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=<b>true</b> # AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings). # Only messages with routingKey = 'cities' will land here. spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.group=persons spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=<b>true</b> spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons </font>
连接属性到Spring Integration
好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。
<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:
<b>package</b> com.example.spring.cloud.configuration; <b>import</b> org.springframework.cloud.stream.annotation.Input; <b>import</b> org.springframework.messaging.SubscribableChannel; <font><i>/** * Created by tomask79 on 30.03.17. */</i></font><font> <b>public</b> <b>interface</b> SinkRabbitAPI { String INPUT_CITIES = </font><font>"citiesChannel"</font><font>; String INPUT_PERSONS = </font><font>"personsChannel"</font><font>; @Input(SinkRabbitAPI.INPUT_CITIES) SubscribableChannel citiesChannel(); @Input(SinkRabbitAPI.INPUT_PERSONS) SubscribableChannel personsChannel(); } </font>
Spring Boot启动时加载这个属性
<b>package</b> com.example.spring.cloud; <b>import</b> com.example.spring.cloud.configuration.SinkRabbitAPI; <b>import</b> com.example.spring.cloud.configuration.SourceRabbitAPI; <b>import</b> org.springframework.boot.SpringApplication; <b>import</b> org.springframework.boot.autoconfigure.SpringBootApplication; <b>import</b> org.springframework.cloud.stream.annotation.EnableBinding; <b>import</b> org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableBinding({SinkRabbitAPI.<b>class</b>}) <b>public</b> <b>class</b> StreamingApplication { <b>public</b> <b>static</b> <b>void</b> main(String[] args) { SpringApplication.run(StreamingApplication.<b>class</b>, args); } }
在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:
<b>import</b> com.example.spring.cloud.configuration.SinkRabbitAPI; <b>import</b> com.example.spring.cloud.configuration.SourceRabbitAPI; <b>import</b> org.springframework.cloud.stream.annotation.StreamListener; <b>import</b> org.springframework.integration.support.MessageBuilder; <b>import</b> org.springframework.messaging.MessageChannel; <b>import</b> org.springframework.messaging.handler.annotation.SendTo; <b>import</b> org.springframework.stereotype.Service; <b>import</b> javax.annotation.Resource; <font><i>/** * Created by tomask79 on 30.03.17. */</i></font><font> @Service <b>public</b> <b>class</b> ProcessingAMPQEndpoint { @StreamListener(SinkRabbitAPI.INPUT_CITIES) <b>public</b> <b>void</b> processCity(<b>final</b> String city) { System.out.println(</font><font>"Trying to process input city: "</font><font>+city); } @StreamListener(SinkRabbitAPI.INPUT_PERSONS) <b>public</b> <b>void</b> processPersons(<b>final</b> String person) { System.out.println(</font><font>"Trying to process input person: "</font><font>+person); } } </font>
RabbitMQ绑定器和代理配置
Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:
spring: cloud: stream: bindings: ... binders: rabbitbinder: type: rabbit environment: spring: rabbitmq: host: rabbitmq port: 5672 username: XXX password: XXX
测试消息消费
- 安装并运行RabbitMQ代理
- git clone https://tomask79@bitbucket.org/tomask79/spring-cloud-stream-rabbitmq.git
- mvn clean install
- java -jar target / streaming-0.0.1-SNAPSHOT.jar
- 现在使用路由键'cities'或'persons'在streamInput Exchange上发布消息...输出应该是:
Started StreamingApplication in 6.513 seconds (JVM running <b>for</b> 6.92) Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd
使用Spring Cloud Stream重新传递消息
您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:
spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6
这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:
@StreamListener(SinkRabbitAPI.INPUT_PERSONS) <b>public</b> <b>void</b> processPersons(<b>final</b> String person) { System.out.println(<font>"Trying to process input person: "</font><font>+person); <b>throw</b> <b>new</b> RuntimeException(); } </font>
如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:
Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Retry Policy Exhausted at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover (RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-1.7.0.RELEASE.jar! /:na] at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc
建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为 Java 中的AMPQ基础架构编写样板代码。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 事件驱动架构引领产业技术升级: 事件驱动联盟(中国)成立
- Redis 事件驱动分析
- 聊聊事件驱动模型
- 领域驱动设计 (DDD) 实践之路(二):事件驱动与 CQRS
- 领域驱动设计 (DDD) 实践之路(二):事件驱动与 CQRS
- Redis 源码学习之事件驱动
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。