SpringCloudStream实战

栏目: Java · 发布时间: 6年前

内容简介:Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream构建于Spring Boot之上,用于创建独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。也就是说,Spring Cloud Stream是构建于Spring Boot和Spring Integration之上的框架,帮助创建事件驱动或消息驱动的微服务。主要模型如图:

Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream构建于Spring Boot之上,用于创建独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。也就是说,Spring Cloud Stream是构建于Spring Boot和Spring Integration之上的框架,帮助创建事件驱动或消息驱动的微服务。

主要模型如图:

SpringCloudStream实战

这里我们使用Kafka作为消息底层设施,原因见: 为什么我们从RabbitMQ切换到apache kafka?

引入Kafka的Stream启动器:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>2.0.1.RELEASE</version>
</dependency>

微服务架构遵循“ 智能端点和哑管 ”原则,端点之间的通信由RabbitMQ或Apache Kafka等消息传递中间件方驱动,服务通过这些端点或通道发布领域事件进行通信。

首先我们定义一个接口,定义输入和输出队列管道:

public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

@Input注释用来表示输入的消息队列,通过该通道接收消息并输入当前应用;@Output注释表示一个输出通道,通过它发布消息出去。@Input和@Output注解可以采取指定的通道名称(比如这里greetings-in greetings-out")作为参数,如果未提供名称,则使用注释的方法名称。

在application.yaml或property中具体配置该消息通道到Kafka:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

其中greetings-in和greetings-out配置到Kafka具体的主题topic名称为greetings,序列化类型是json,kafka默认端口在本地9092。

好了,底层基础设施准备完成,现在需要将这个设施安装到我们的应用中。

@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

@EnableBinding将应用配置绑定接口GreetingsStreams中定义的通道INPUT和OUTPUT。

现在我们的应用和消息基础设施已经绑定了,可使用@StreamListener到具体方法以接收具体的流处理事件了。

@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

StreamListeners 是消息监听者处理方法,接收类型的传入消息Greetings,可以看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型Person。

上面方法是一个没有返回结果的void方法,如果有返回结果,必须使用@SendTo注释指定方法返回的数据的输出绑定队列目标output,如以下示例所示;通过

@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
   @SendTo(GreetingsStreams.OUTPUT)
    public String handleGreetings(Greetings greetings) {
        log.info("Received greetings: {}", greetings);
        return "Received greetings: {}" + greetings;
    }
}

Spring cloud stream实现了一个默认的Processor类,类似我们的GreetingsStreams接口,也就是说,可以不用自己做这个接口

public interface Processor extends Source, Sink {
}
public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

如果使用默认的Processor通道名称,注意配置文件里也要配置成相应的通道名。

测试运行

有了接收方,下面我们实现一个发送方,我们通过调用rest接口发送消息,先看看发送方代码:

@Service
@Slf4j
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;

    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }

    public void sendGreeting(final Greetings greetings) {
        log.info("Sending greetings {}", greetings);
        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

我们暴露一个端口来调用这个发送方:

@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;
    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
                .message(message)
                .timestamp(System.currentTimeMillis())
                .build();
        greetingsService.sendGreeting(greetings);
    }

}

也就是说:这个发送方REST和发送服务 与我们的GreetingsListener是通过消息系统通讯的,不是直接在发送服务里调用GreetingsListener的方法,这样这两者之间就解耦了。

下面我们用postman调用:

http://localhost:8080/greetings?message=hello

控制台结果输出:

c.e.c.GreetingsService : Sending greetings Greetings(timestamp=1535614400754, message=hello)

c.e.c.GreetingsListener : Received greetings: Greetings(timestamp=1535614400754, message=hello)

一个发送和一个接受完成了一个请求调用,如果GreetingsListener还有返回结果,是放在greetings-out之中的,那么GreetingsListener就变成发送方了,我们也可以参考这套做法再做个监听器。

源码下载

使用Spring Request-Reply实现基于Kafka的同步请求响应

Spring Cloud专题

Kafka专题


以上所述就是小编给大家介绍的《SpringCloudStream实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

20个月赚130亿

20个月赚130亿

陈士骏、张黎明 / 中国华侨出版社 / 2011-11-17 / 35.00元

YouTube联合创始人陈士骏在书中以朴实亲切的口吻讲述了他的人生经历,以及对学业、事业、梦想、财富、生死等的种种感悟。 童年随全家去美国小镇定居,少年时代迷上计算机编程; 离大学毕业还有几个月时放弃学位,怀揣200美元奔赴硅谷,加入创业公司PayPal,公司上市后成为百万富翁; 因为无法接受PayPal被EbayeBay收购后工程师丧失发言权,和好友一起开创视频网站YouTub......一起来看看 《20个月赚130亿》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具