RxJS与Redux结合使用(一):打造自己的redux-observable
栏目: JavaScript · 发布时间: 5年前
内容简介:Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常
Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:
view -> action -> reducer -> state -> view 复制代码
然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。
Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常有效。较为常见的做法主要有两种:
- 更改action的类型,如redux-thunk,用函数替换了action;
- 在middleware中接收到action的时候做出一些对应处理,如redux-saga。
而我们今天要讲的rxjs与redux的结合,采用了第二种方式来处理异步流程。
中间件干预处理
市面上已经存在这么个中间件了:redux-observable。而我们今天要做的就是带领大家,一步一步慢慢实现自己的一个redux-observable。
这个中间件的原理我可以简化为下面的代码:
export default store => next => action => { const result = next(action); if (action.type === 'ping') { store.dispatch({ type: 'pong' }) } return result; } 复制代码
原理实在简单,在next(action)之后去根据action做判断,做一些异步逻辑,再发起dispatch修改数据即可,而redux-observable也只是在这个基础之上加入RxJs的一些特性。
处理异步逻辑的思路
如果你比较熟悉redux的话,就会知道,redux的中间件就是一个洋葱模型,上面我们也说了,我们会在中间件的后面根据你的action再去重新dispatch一些action,而Rxjs最核心的思想就是将数据都流化。所以你可以理解为action在中间件的末端流入一个管道,最后从管道又流出一些action,这些action最终会再次被store dispatch。
至于在这个管道中进行了什么样的变化、操作,那就是Rxjs的管辖范围,通过Rxjs的强大的操作符,我们可以非常优雅地实现异步逻辑。
所以,需要有个流来承载所有的action,这样你就可以通过这个action$来进行fetch:
action$.pipe( switchMap( () => fromPromise(fetch('/api/whatever')).pipe( map(res => action) ) ), catchError(() => {}) ) 复制代码
这样就将异步逻辑嵌入到流当中。
创建Action流
我们的核心思想是action in, action out,所以最终流出的action是要重新被store.dispatch消费的,所以action$是一个Observable对象。
同时,在dispatch的时候,action经过中间件,action 也是一个observer。
因此,action$既是观察者又是可观察对象,是一个Subject对象:
替换中间件的简单写法,变成:
import { Subject } from 'rxjs/Subject'; export default (store) => { const action$ = new Subject(); action$.subscribe(store.dispatch); return next => (action) => { const result = next(action); action$.next(action); return result; }; }; 复制代码
在上面代码中我们在middleware中去放入action,然后通过订阅,store会触发dispatch。
但是,如果我们就这么写的话,这是个死循环,因为任何action在进入到action 去。
聪明的你应该想到了,我们对于进入到action$的action还没有进行任何过滤,而这个过滤过程也正是我们需要的处理异步逻辑的地方。
下面我们要把这个步骤加上。
流的转化器Epic
为了达到action的一个转化处理,我们将这个过程抽离出来,这个中间处理的逻辑称为Epic,epic的形式大概可以写为:
const epic = (action$) => { return action$.pipe( // 因为所有的action都会过来 // 所以我们只需要处理我们想要的aciton filter(action => action.type === 'GET_USER'), switchMap( // 将fetch也转化为流 () => fromPromise(fetch('/api/user/get', { method: 'POST', body: { id: 1 }, })).pipe( map(res => ({ type: 'GET_USER_SUCCESS', payload: res })), catchError(error => ({ type: 'GET_USER_FAILED', payload: error })) ) ) ) } 复制代码
epic本质是一个函数,在这个函数中,我们在action$的基础上,加入了管道控制,产生了另外一个流,而这个流就是最终我们要的,对action进行了控制的action流,上面的fetch只是一个例子,在这个管道中,你可以处理任意的异步逻辑。
而我们要做的就是将这个Epic,整合进刚才的中间中。
做法也很简单,我们只需要将订阅从action$换到新的流上就可以了:
import { Subject } from 'rxjs/Subject'; export default (store) => { const action$ = new Subject(); const newAction$ = epic(action$); newAction$.subscribe(store.dispatch); return next => (action) => { const result = next(action); action$.next(action); return result; }; }; 复制代码
这样,action$在接收到新的action的时候,会流经epic定义的管道,然后才出发dispatch
多个Epic合并
到此,我们的中间件已经有初步处理异步逻辑的能力,但是,在现实中,我们的异步逻辑不可能只有一个,所以epic是会有很多的,而store去订阅的流只能是一个,所以这么多的epic产生的流要合并成一个流。
合并流的操作,强大的RxJs自然是有安排的,相信你想到了操作符merge,我们可以提供一个combineEpics的函数:
export const combineEpics = (...epics) => { const merger = (...args) => merge( ...epics.map((epic) => { const output$ = epic(...args); return output$; }) ); return merger; }; 复制代码
上面的代码不难理解,combineEpics整合了所有传入的epic,然后返回一个merger,这个merger是利用merge操作符,将所有的epic产生的流合并成一个流。
代码形式为:
const pingEpic = action$ => action$.pipe( filter(action => action.type === 'ping'), map(() => ({ type: 'pong' })), ); const getUserEpic = action$ => action$.pipe( filter(action => action.type === 'GET_USER'), map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })), ); const rootEpic = combineEpics(pingEpic, getUserEpic); export default (store) => { const action$ = new Subject(); const newAction$ = rootEpic(action$); newAction$.subscribe(store.dispatch); return next => (action) => { const result = next(action); action$.next(action); return result; }; }; 复制代码
state获取
在epic中我们不可避免地要借助state里面的数据进行不同的处理,所以我们是需要获取到state的,所以你可以在中间件中的epci执行函数中添加一个参数,将state获取函数暴露出去:
export default (store) => { ... const newAction$ = rootEpic(action$, store.getState); ... }; 复制代码
这样epic里就可以用getState()获取state:
const pingEpic = (action$, getState) => action$.pipe( filter(action => action.type === 'ping'), map(() => ({ type: 'pong', payload: getState() })), ); 复制代码
进一步优化:将state也流化
上面的做法是直接去获取state,这样的做法是主动获取,不符合函数响应式编程模式。函数响应式中,state的改变状态,应该是要能被观察的。
当state也能被响应观察,我们就可以做更多的功能,例如:当state的某些数据在发生变化的时候,我们要去进行实时保存。
在传统模式的做法中,你可以在中间件中这样写:
export default store => next => action => { const oldState = store.getState(); const result = next(action); const newState = store.getState(); // 类似这样的写法 if (newState.xxx !== oldState.xxx) { fetch('/api/save', { method: 'POST', body: { } }).then(() => {}).catch(() => {}) } return result; } 复制代码
这个处理逻辑要独立为一个中间件,而如果你将state也流化,你可以直接使用epic这样处理:
const saveEpic = (action$, state$) => state$.pipe( const autoSaveEpic = (action$, state$) => return action$.pipe( filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自动保存的开启 exhaustMap(() => state$.pipe( pluck('xxx'), // 获取state.xxx distinctUntilChanged(), // 前后值不同时才将其发出。 concatMap((value) => { // fetch to save }), // 自动保存的关闭 takeUntil(action$.pipe( filter(action => action.type === 'AUTO_SAVE_DISABLE') )) )) ) ) 复制代码
如果仔细阅读这段代码,可以发现这样的方式可以使用非常优雅的方式控制这个自动保存,可以和action$结合使用,快速开关自动保存,可以利用RxJs的特性解决保存的异步执行延迟问题。
如果你只是单存想要获取最新state,可以使用withLatestFrom操作符:
const countEpic = (action$, state$) => action$.pipe( filter(action => action.type === 'count'), withLatestFrom(state$), switchMap(([action, state]) => { return of({ type: 'whatever' }); }) ); 复制代码
在中间件加入state流:
export default (store) => { const action$ = new Subject(); const state$ = new Subject(); const source$ = rootEpic(action$, state$); source$.subscribe(store.dispatch); return next => (action) => { const result = next(action); state$.next(store.getState()); action$.next(action); return result; }; }; 复制代码
注意state 最新值不是默认state。
Action的顺序问题
如果你有耐心看到这里,那么说明你对于redux结合RxJs的使用已经理解得差不多了,但是这里还是有个问题,就是action的生效顺序,我们可以直接看个例子说明,假设有下面这样两个epic:
const epic1 = action$ => action$.pipe( filter(action => action.type === 'one'), mergeMap(() => of({ type: 'two' }, { type: 'three' })), ); const epic2 = action$ => action$.pipe( filter(action => action.type === 'two'), mergeMap(() => of({ type: 'four' })), ); 复制代码
当 store.dispatch({ type: 'one' })
的时候,action的顺序为:
'one' -> 'two' -> 'four' -> 'three' 复制代码
可见,action的执行顺序并不是如我们预期的那样,在two触发后就发出了four,这是因为RxJs默认的调度器是同步的,用一段简单的代码,上面的效果类似于:
class Print { constructor(name, nexts = []) { this.name = name; this.nexts = nexts; } print() { console.log(this.name); this.nexts.forEach((p) => { p.print(); }); } } const three = new Print('three'); const four = new Print('four'); const two = new Print('two', [four]); const one = new Print('one', [two, three]); one.print(); // one, two, four, three 复制代码
换成上面的代码的话你就不陌生了吧,也会对于输出的结果表示肯定,但是我们需要的效果是
'one' -> 'two' -> 'three' -> 'four' 复制代码
这如何做到?
明显,需要将调度器换成其他的,RxJs有这么几种调度器:null(同步)、asap、queue、async、animationFrame。最后一种是动画场景的调度器,直接剔除,默认是第一种,那么就剩下asap、queue、async。在这个场景下,这三种调度器都是可行的,但是queue在大量的数据的时候对于性能是有利的,所以这里可以使用它。不过,记住,这三种调度器是有区别的,大家有兴趣的自己去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延迟为0的时候接近于同步,在延迟不为0的时候与async一样。
中间件中:
const action$ = new Subject().pipe( observeOn(queue) ); 复制代码
这样得到的结果就是:
'one' -> 'two' -> 'three' -> 'four' 复制代码
如果用简单的代码,相当于发生了这样的变化:
class Print { constructor(name, nexts = []) { this.name = name; this.nexts = nexts; } print() { console.log(this.name); this.nexts.forEach((p) => { setTimeout(() => p.print(), 0); }); } } const three = new Print('three'); const four = new Print('four'); const two = new Print('two', [four]); const one = new Print('one', [two, three]); one.print(); // one, two, three, four 复制代码
总结
本文就讲到这里,这次介绍了如何自己实现一个redux-observable,下次会讲redux-observable在实战中的一些应用,例如怎么类似dva那样进行模块化开发、如何统一处理loading、error等。
以上所述就是小编给大家介绍的《RxJS与Redux结合使用(一):打造自己的redux-observable》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- NServiceBus 结合 RabbitMQ 使用教程
- 结合案例使用 Java 注解和反射
- Apache使用fcgi方式与PHP结合
- Redux 基础教程以及结合 React 使用方式
- Android:你该如何结合Retrofit 与 RxJava使用(含使用教程)
- laravel-admin 与 vue 结合使用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。