Spring WebFlux和Reactive编程

栏目: Java · 发布时间: 5年前

内容简介:在看到Jurgen Hoeller引入Maven WebFlux项目生成

在看到Jurgen Hoeller引入 新的Spring 5功能后, 我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:

Maven WebFlux项目生成

  • 转到 Spring启动应用程序生成器
  • 在Spring Boot版本中选择“2.0.0”以上版本
  • 在依赖项中搜索“ Reactive Web ”
  • 保存生成的maven项目

演示(反应端点)

在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以 Reactive方式 获取通用存储Item 。首先让我们开始:

@RestController
<b>public</b> <b>class</b> ItemsReactiveController {

    @Autowired
    <b>private</b> IItemsService iItemsService;

    <b>public</b> Flux<Item> findById(String id) {
        <b>try</b> {
            System.out.println(<font>"Getting the data from DB..."</font><font>);
            Thread.sleep(500);
        } <b>catch</b> (InterruptedException e) {
            e.printStackTrace();
        }
        <b>return</b> Flux.just(iItemsService.findById(id));
    }

    @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    <b>public</b> <b>void</b> getItemById(@PathVariable String id) {
        System.out.println(</font><font>"Controller start...."</font><font>);
        findById(id).subscribe(<b>new</b> Subscriber<Item>() {

            @Override
            <b>public</b> <b>void</b> onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }
            @Override
            <b>public</b> <b>void</b> onNext(Item t) {
                System.out.println(</font><font>"Retrieved: "</font><font>+t.getName());
            }
            @Override
            <b>public</b> <b>void</b> onError(Throwable t) {
            }
            @Override
            <b>public</b> <b>void</b> onComplete() {
            }
        });
        System.out.println(</font><font>"End of method."</font><font>);
    }
}
</font>

代码详细

方法findById返回Flux <Item>类型。 Flux 是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的 Mono <Item>类型。

Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的 订阅者 ,这是 Project reactor 的底层实现。

第一次测试:

项目源码: https://bitbucket.org/tomask79/spring-reactive-rest.git

让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器: http://localhost:8081/store/{id }

系统输出:

Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
Retrieved: <Name of the Item>
End of method.

如您所见,代码输出每个步骤:

  • 调用控制器(Controller start ....)
  • 获取item调用服务(Retrieved: <Name of the Item>)
  • 达到了方法的结束。(End of method)

默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:

  • Mongo
  • Redis
  • Cassandra
  • Postgres

当然,激活检查新项目 Spring Data“Kay” ,才能在上面提到的Spring Data项目中启用反应范例。

要实际启用异步发布我们的项目,我们需要将控制器更改为:

<b>package</b> com.example.controller;

<b>import</b> com.example.domain.Item;
<b>import</b> com.example.service.IItemsService;
<b>import</b> org.reactivestreams.Subscriber;
<b>import</b> org.reactivestreams.Subscription;
<b>import</b> org.springframework.beans.factory.annotation.Autowired;
<b>import</b> org.springframework.http.MediaType;
<b>import</b> org.springframework.web.bind.annotation.GetMapping;
<b>import</b> org.springframework.web.bind.annotation.PathVariable;
<b>import</b> org.springframework.web.bind.annotation.RequestParam;
<b>import</b> org.springframework.web.bind.annotation.RestController;
<b>import</b> reactor.core.publisher.Flux;
<b>import</b> reactor.core.scheduler.Scheduler;
<b>import</b> reactor.core.scheduler.Schedulers;

<font><i>/**
 * Created by Tomas.Kloucek on 17.3.2017.
 */</i></font><font>
@RestController
<b>public</b> <b>class</b> ItemsReactiveController {

    @Autowired
    <b>private</b> IItemsService iItemsService;

    <b>public</b> Flux<Item> findById(String id) {
        <b>try</b> {
            System.out.println(</font><font>"Getting the data from DB..."</font><font>);
            Thread.sleep(500);
        } <b>catch</b> (InterruptedException e) {
            e.printStackTrace();
        }
        <b>return</b> Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel());
    }

    @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    <b>public</b> <b>void</b> getItemById(@PathVariable String id) {
        System.out.println(</font><font>"Controller start...."</font><font>);
        findById(id).subscribe(v -> {
           System.out.println(</font><font>"Consumed: "</font><font>+v.getId());
        });
        System.out.println(</font><font>"End of method."</font><font>);
    }
}
</font>
  • 改变了订阅只获得结果。如果你知道RxJava你应该熟悉。
  • 添加了publishOn方法,并调度了用于异步发布Item的线程。

现在,如果我们从浏览器再次点击端点,输出将是:

Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
End of method.
Consumed: <ID of your Item entity>

正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。

如何创建客户端以调用Reactive Endpoint

让我们用代码创建另一个Spring Boot WebReactive应用程序:

@SpringBootApplication
<b>public</b> <b>class</b> DemoApplication {

    @Bean
    <b>public</b> WebClient webClient() {
        <b>return</b> WebClient.create(<font>"http://<reactiveAppHost>:<reactiveAppPort>"</font><font>);
    }

    @Bean
    CommandLineRunner launch(WebClient webClient) {
        <b>return</b> args -> {
            webClient.get().Yuri(</font><font>"/store/{id}"</font><font>)
                    .accept(MediaType.TEXT_EVENT_STREAM)
                    .exchange()
                    .flatMap(cr -> cr.bodyToFlux(Item.<b>class</b>))
                    .subscribe(v -> {
                        System.out.println(</font><font>"Received from MS: "</font><font>+v.getName());
                    });
        };
    }

    <b>public</b> <b>static</b> <b>void</b> main(String args[]) {
        <b>new</b> SpringApplicationBuilder(DemoApplication.<b>class</b>).properties
                (Collections.singletonMap(</font><font>"server.port"</font><font>, </font><font>"8082"</font><font>))
                .run(args);
    }
}
</font>

代码详细:

要调用Reactive端点,您需要首先获取 WebClient 实例。在演示案例中放入create方法 http://localhost:8081 .。由WebClient.exchange()方法调用执行的自调用方法,

但要实际在管道上放置订阅以获取结果,您需要调用 ClientRequest .bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:

Started DemoApplication in 4.631 seconds (JVM running <b>for</b> 4.954)
Received from MS: <Item text>

这部分客户端代码:

git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git

用于异步调用REST端点的API是否必要?

我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:

  • 带有Angular的函数 框架 ,类似Router
  • 支持Project Jigsaw
  • 注册beanLamdas。 

​​​​​​​本站文章点击标题看原文!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Out of their Minds

Out of their Minds

Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00

This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具