使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

栏目: Java · 发布时间: 5年前

内容简介:让我们展示如何使用

让我们展示如何使用 Spring Cloud Stream 来设计 事件驱动的微服务 。首先,Spring Cloud Stream首先有什么好处?因为 Spring AMPQ 提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此 repo ,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?

Spring Cloud Stream概念

  • Spring Cloud Stream通过 Binder 概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
  • Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的 Spring Integration 代码也将是相同的!

示例中的Spring Cloud Stream概念(RabbitMQ)

让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建 SimpleMessageListenerContainer 并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:

spring.cloud.stream.bindings.citiesChannel.destination=streamInput
spring.cloud.stream.bindings.citiesChannel.group=cities
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=<b>true</b>
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities

spring.cloud.stream.bindings.personsChannel.destination=streamInput
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=<b>true</b>
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

配置详细信息

在类路径上使用RabbitMQ Binder,每个目标都映射到 TopicExchange 。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。

spring.cloud.stream.bindings.citiesChannel.destination = streamInput 
spring.cloud.stream.bindings.personsChannel.destination = streamInput

现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。

因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道

# This will create queue <font>"streamInput.cities"</font><font> connected to message channel citiesChannel where input messages will land.
spring.cloud.stream.bindings.citiesChannel.group=cities 

# Durable subscription, of course.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=<b>true</b> 

# AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).
# Only messages with routingKey = 'cities' will land here.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities 

spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=<b>true</b>
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
</font>

连接属性到Spring Integration

好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。

<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:

<b>package</b> com.example.spring.cloud.configuration;

<b>import</b> org.springframework.cloud.stream.annotation.Input;
<b>import</b> org.springframework.messaging.SubscribableChannel;

<font><i>/**
 * Created by tomask79 on 30.03.17.
 */</i></font><font>
<b>public</b> <b>interface</b> SinkRabbitAPI {

    String INPUT_CITIES = </font><font>"citiesChannel"</font><font>;

    String INPUT_PERSONS = </font><font>"personsChannel"</font><font>;

    @Input(SinkRabbitAPI.INPUT_CITIES)
    SubscribableChannel citiesChannel();

    @Input(SinkRabbitAPI.INPUT_PERSONS)
    SubscribableChannel personsChannel();
}
</font>

Spring Boot启动时加载这个属性

<b>package</b> com.example.spring.cloud;

<b>import</b> com.example.spring.cloud.configuration.SinkRabbitAPI;
<b>import</b> com.example.spring.cloud.configuration.SourceRabbitAPI;
<b>import</b> org.springframework.boot.SpringApplication;
<b>import</b> org.springframework.boot.autoconfigure.SpringBootApplication;
<b>import</b> org.springframework.cloud.stream.annotation.EnableBinding;
<b>import</b> org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableBinding({SinkRabbitAPI.<b>class</b>})
<b>public</b> <b>class</b> StreamingApplication {

    <b>public</b> <b>static</b> <b>void</b> main(String[] args) {
        SpringApplication.run(StreamingApplication.<b>class</b>, args);
    }
}

在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:

<b>import</b> com.example.spring.cloud.configuration.SinkRabbitAPI;
<b>import</b> com.example.spring.cloud.configuration.SourceRabbitAPI;
<b>import</b> org.springframework.cloud.stream.annotation.StreamListener;
<b>import</b> org.springframework.integration.support.MessageBuilder;
<b>import</b> org.springframework.messaging.MessageChannel;
<b>import</b> org.springframework.messaging.handler.annotation.SendTo;
<b>import</b> org.springframework.stereotype.Service;

<b>import</b> javax.annotation.Resource;

<font><i>/**
 * Created by tomask79 on 30.03.17.
 */</i></font><font>
@Service
<b>public</b> <b>class</b> ProcessingAMPQEndpoint {

    @StreamListener(SinkRabbitAPI.INPUT_CITIES)
    <b>public</b> <b>void</b> processCity(<b>final</b> String city) {
        System.out.println(</font><font>"Trying to process input city: "</font><font>+city);
    }

    @StreamListener(SinkRabbitAPI.INPUT_PERSONS)
    <b>public</b> <b>void</b> processPersons(<b>final</b> String person) {
        System.out.println(</font><font>"Trying to process input person: "</font><font>+person);
    }
}
</font>

RabbitMQ绑定器和代理配置

Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:

spring:
  cloud:
    stream:
      bindings:
        ...
      binders:
          rabbitbinder:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: rabbitmq
                  port: 5672
                  username: XXX
                  password: XXX

测试消息消费

Started StreamingApplication in 6.513 seconds (JVM running <b>for</b> 6.92) 
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd

使用Spring Cloud Stream重新传递消息

您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:

spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6

这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:

 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)
    <b>public</b> <b>void</b> processPersons(<b>final</b> String person) {
        System.out.println(<font>"Trying to process input person: "</font><font>+person);
        <b>throw</b> <b>new</b> RuntimeException();
    }
</font>

如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:

Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
 Retry Policy Exhausted
        at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover
(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-1.7.0.RELEASE.jar! /:na]
        at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc       

建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为 Java 中的AMPQ基础架构编写样板代码。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Hacking

Hacking

Jon Erickson / No Starch Press / 2008-2-4 / USD 49.95

While other books merely show how to run existing exploits, Hacking: The Art of Exploitation broke ground as the first book to explain how hacking and software exploits work and how readers could deve......一起来看看 《Hacking》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具