Spring Web-Flux中的背压机制

栏目: Java · 发布时间: 6年前

内容简介:响应式(反应式)编程的好处是背压Backpressure,可以平衡请求或响应率,这点与异步机制区别所在,也就是说,当响应堵塞时,会同时堵塞请求,因此reactive响应式=异步+同步(背压)。本文解释了Spring Web-Flux中的背压机制,假设我们编写一个Spring Web-Flux的控制器代码如下:这段代码背后的背压工作机制是什么?为了理解Backpressure在WebFlux框架中如何工作,我们必须回顾一下当前默认使用的传输层。

响应式(反应式)编程的好处是背压Backpressure,可以平衡请求或响应率,这点与异步机制区别所在,也就是说,当响应堵塞时,会同时堵塞请求,因此reactive响应式=异步+同步(背压)。本文解释了Spring Web-Flux中的背压机制,假设我们编写一个Spring Web-Flux的控制器代码如下:


@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just(
"I am First Mono")
}
}

这段代码背后的背压工作机制是什么?

为了理解Backpressure在WebFlux框架中如何工作,我们必须回顾一下当前默认使用的传输层。

我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器通信通常也是一样)是通过TCP连接完成的。WebFlux还是使用该传输进行客户端和服务器之间的通信。然后,为了获得背压控制,我们必须从Reactive Streams规范的角度来概述背压的含义。

规范的基本语义定义了如何通过背压来调节流元素的传输。

因此,从该声明中,我们可以得出结论,在Reactive Streams中,背压是一种通过传输(通知)接收者可以消费多少元素来调节生产的机制(消费决定生产); 在这里,我们有一个棘手的问题。TCP具有字节抽象而不是逻辑元素抽象。我们通常所说的背压控制是控制向网络发送/接收的逻辑元件的数量。即使TCP有自己的流控制,这个流控制仍然是字节而不是逻辑元素。

在WebFlux模块的当前实现中,背压由传输流控制来调节,但它不会暴露接收方的实际需求。为了最终看到交互流程,请参见下图:

Spring Web-Flux中的背压机制

为简单起见,上图显示了两个微服务之间的通信,其中左侧发送生产数据流,右侧消费该流。以下编号列表提供了该图表的简要说明:

1. 这是WebFlux框架,它正确地将逻辑元素转换为字节并返回并将它们传输到TCP /从TCP(网络)接收。

2. 这是数据长时间运行处理的开始,一旦作业完成,该数据就会请求下一个数据。

3. 在这里,虽然没有来自业务逻辑的需求,但WebFlux将来自网络的字节排队,而没有他们的确认(业务逻辑没有要求)。

4. 由于TCP流控制的性质,服务A仍然可以向网络发送数据。

正如我们可能从上图中注意到的那样,接收方的请求与发送方的请求不同(逻辑元素中的请求)。这意味着两者的请求是独立的,因此真正背压仅仅仅适用于WebFlux < - >业务逻辑(服务)交互,服务A < - >服务B交互不是完整的背压。

所有这些意味着背压控制在WebFlux中并不像我们预期的那样公平。

但我仍然想知道如何控制背压

如果我们仍然希望对WebFlux中的背压进行公平的控制,我们可以在Project Reactor支持下实现limitRate()。以下示例显示了我们如何使用:


@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {

return tweetService.process(tweetsFlux.limitRate(10))
.then();
}

正如我们从示例中看到的那样,limitRate()运算符允许一次定义要预取的数据数。这意味着即使最终订户请求Long.MAX_VALUE元素,limitRate运行者也会将该请求拆分为块,并且不允许一次消费更多。我们可以用数据元素的发送过程来做同样的事情:


@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {

return tweetService.retreiveAll()
.limitRate(10);
}

上面的示例显示,即使WebFlux一次请求超过10个元素,也会limitRate()限制对预取大小的需求,并防止一次消费超过指定数量的元素。

另一种选择是实现自己的Subscriber或扩展BaseSubscriber来自Project Reactor。例如,以下是我们如何做到这一点的简单例子:


class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

int consumed;
final int limit = 5;

@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}

@Override
protected void hookOnNext(T value) {
// do business logic there

consumed++;

if (consumed == limit) {
consumed = 0;

request(limit);
}
}
}

使用RSocket协议的公平背压

为了通过网络边界实现逻辑元素背压,我们需要一个适当的协议。幸运的是,有一种称为RScoket协议。RSocket是一种应用程序级协议,允许通过网络边界传输实际需求。该协议有一个RSocket-Java实现,允许设置RSocket服务器。在服务器到服务器通信的情况下,相同的RSocket-Java库也提供客户端实现。

对于浏览器 - 服务器通信,有一个RSocket-JS实现,能通过WebSocket连接浏览器和服务器之间的流通信。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Alone Together

Alone Together

Sherry Turkle / Basic Books / 2011-1-11 / USD 28.95

Consider Facebookit’s human contact, only easier to engage with and easier to avoid. Developing technology promises closeness. Sometimes it delivers, but much of our modern life leaves us less connect......一起来看看 《Alone Together》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具