RxJS进阶——关于流的理解和应用

栏目: JavaScript · 发布时间: 7年前

内容简介:RxJS是微软公司推出的响应式编程的JavaScript库。对于它的学习,最开始我的理解是把它当成是随着学习的深入,发现它采用了订阅者模式,其中也带有纯函数的思想。

RxJS是微软公司推出的响应式编程的JavaScript库。

对于它的学习,最开始我的理解是把它当成是 能优雅地解决异步问题的lodash

随着学习的深入,发现它采用了订阅者模式,其中也带有纯函数的思想。

直到在使用了RxJS 6之后才了解其少有人意识到的另一面——流。

什么是流?这里我们不用专业术语来解释,用生活中大家熟悉的的例子来类比,比如“河流”。

河流有什么特点?

至少有两个特点:

有朝向。

水往低处流,河流虽然可能会蜿蜒盘旋,但是朝向是固定的,比如我国的长江和黄河就都是由西往东流。

在RxJS中数据的流向也是固定的,就是从发送者到订阅者。基本都如下面这种形式:

from(Promise.resolve(1)) // 流的源头
......
.subscribe(x => console.log(x)); // 流的终点

有分支。

大的河流一般有干流和支流,大大小小的支流汇入干流。

RxJS中的数据则可以通过操作符将数据流进行聚合或拆分。

// 流的聚合
mergeMap(from(Promise.resolve(1)), from(Promise.resolve(2)))
......
.subscribe(x => console.log(x))

// 流的拆分
const obs$ = from(Promise.resolve(1)
obs$.subscribe(x => console.log(x))
obs$.subscribe(x => {
  // do sth
})/

RxJS 6 相对于 RxJS 5(这里指5.5以下的版本,因为pipe函数在RxJS 5.5中作为新特性已被引入。) 来说不仅修改了一部分操作符的名称,同时做了一个较大的改动,引入了管道(pipe)。这个改动到底有多大?

首先是写法上的变化。

RxJS 5的这种操作符的调用方式有没有一种似曾相识的感觉?

是的,它看上去很像JQuery那种意大利面条式的链式调用

而RxJS 6和Gulp的写法有些像,想想Gulp是什么?基于流的构建工具!

// RxJS 5 伪代码
myObservable
  .map(data => data * 2)
  .switchMap(...)
  .throttle(...))
  .subscribe(...);

// RxJS 6 伪代码
myObservable
  .pipe(map(data => data * 2), switchMap(...), throttle(...))
  .subscribe(...);

这种写法上的变化就带来了用法上的变化,以前的固定“河流”可以通过“管道”(pipe)来控制形成灵活的“水流”。

下面举个例子来更加形象地阐述加入管道之后流的灵活性。

现在有一个这样的业务场景:

点击按钮之后发送一个请求,让服务端开始执行任务,然后轮询发送请求查询任务执行状态,根据不同状态进行不同操作。有3种状态”controlling”——继续轮询, “stop”——停止轮询,”finish”——停止轮询,并进行后续操作。

不考虑判断条件,伪代码是下面这样子:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  )
.subscribe(x => {
  // 后续操作
})

这段代码有一个问题没有解决,根据状态进行相应操作。

先来看看这3种状态对应的操作。

  • controlling。继续轮询很好处理,不进行任何操作即可。
  • stop。停止轮询的操作符有3个: take ,需要固定次数,这个次数没法预先确定。 takeUntil ,需要创建一个额外的subject来进行停止,应该可以实现,不过代码量比较大。 takeWhile ,只需简单的逻辑判断即可,比较合适。
  • finish。问题来了, 如过我们在管道操作符中判断状态的并停止流的话,那么订阅者将无法收到消息,意味着后续操作无法执行。

解决方法就是把后续操作放到管道中。代码如下:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操作
  })
  takeWhile(() => false) // 操作完成结束轮询
  )
.subscribe();

现在需求变化了,在另一段代码中,我们也要通过查询状态并根据状态进行,但是不再需要开始任务和轮询了。

那么上面的代码查询和操作部分可以利用pipe方法抽取出来。

const handle = pipe(
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操作
  })
  takeWhile(() => false) // 操作完成结束轮询
)
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  handle
).subscribe();
// 直接复用查询状态代码和后续操作部分代码
other.pipe(
  handle
).subscribe()

总结一下。RxJS比较完整的理解应该是基于流的订阅者模式,而流的灵活性体现在可拆分和聚合,有了pipe管道的加入,流的可复用性增强,因此更容易对代码逻辑进行抽象。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ant Colony Optimization

Ant Colony Optimization

Marco Dorigo、Thomas Stützle / A Bradford Book / 2004-6-4 / USD 45.00

The complex social behaviors of ants have been much studied by science, and computer scientists are now finding that these behavior patterns can provide models for solving difficult combinatorial opti......一起来看看 《Ant Colony Optimization》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试