RxJS与Redux结合使用(一):打造自己的redux-observable

栏目: JavaScript · 发布时间: 5年前

内容简介:Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常

Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:

view ->  action -> reducer -> state -> view
复制代码

然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。

Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常有效。较为常见的做法主要有两种:

  1. 更改action的类型,如redux-thunk,用函数替换了action;
  2. 在middleware中接收到action的时候做出一些对应处理,如redux-saga。

而我们今天要讲的rxjs与redux的结合,采用了第二种方式来处理异步流程。

中间件干预处理

市面上已经存在这么个中间件了:redux-observable。而我们今天要做的就是带领大家,一步一步慢慢实现自己的一个redux-observable。

这个中间件的原理我可以简化为下面的代码:

export default store => next => action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong' })
    }
    return result;
}
复制代码

原理实在简单,在next(action)之后去根据action做判断,做一些异步逻辑,再发起dispatch修改数据即可,而redux-observable也只是在这个基础之上加入RxJs的一些特性。

处理异步逻辑的思路

如果你比较熟悉redux的话,就会知道,redux的中间件就是一个洋葱模型,上面我们也说了,我们会在中间件的后面根据你的action再去重新dispatch一些action,而Rxjs最核心的思想就是将数据都流化。所以你可以理解为action在中间件的末端流入一个管道,最后从管道又流出一些action,这些action最终会再次被store dispatch。

RxJS与Redux结合使用(一):打造自己的redux-observable

至于在这个管道中进行了什么样的变化、操作,那就是Rxjs的管辖范围,通过Rxjs的强大的操作符,我们可以非常优雅地实现异步逻辑。

所以,需要有个流来承载所有的action,这样你就可以通过这个action$来进行fetch:

action$.pipe(
    switchMap(
        () => fromPromise(fetch('/api/whatever')).pipe(
            map(res => action)
        )
    ),
    catchError(() => {})
)
复制代码

这样就将异步逻辑嵌入到流当中。

创建Action流

我们的核心思想是action in, action out,所以最终流出的action是要重新被store.dispatch消费的,所以action$是一个Observable对象。

同时,在dispatch的时候,action经过中间件,action 也是一个observer。

因此,action$既是观察者又是可观察对象,是一个Subject对象:

替换中间件的简单写法,变成:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  
  action$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

在上面代码中我们在middleware中去放入action,然后通过订阅,store会触发dispatch。

但是,如果我们就这么写的话,这是个死循环,因为任何action在进入到action 去。

聪明的你应该想到了,我们对于进入到action$的action还没有进行任何过滤,而这个过滤过程也正是我们需要的处理异步逻辑的地方。

下面我们要把这个步骤加上。

流的转化器Epic

为了达到action的一个转化处理,我们将这个过程抽离出来,这个中间处理的逻辑称为Epic,epic的形式大概可以写为:

const epic = (action$) => {
    return action$.pipe(
        // 因为所有的action都会过来
        // 所以我们只需要处理我们想要的aciton
        filter(action => action.type === 'GET_USER'),
        switchMap(
            // 将fetch也转化为流
            () => fromPromise(fetch('/api/user/get', {
                method: 'POST',
                body: {
                    id: 1
                },
            })).pipe(
                map(res => ({ type: 'GET_USER_SUCCESS', payload: res })),
                catchError(error => ({ type: 'GET_USER_FAILED', payload: error }))
            )
        )
    )
}

复制代码

epic本质是一个函数,在这个函数中,我们在action$的基础上,加入了管道控制,产生了另外一个流,而这个流就是最终我们要的,对action进行了控制的action流,上面的fetch只是一个例子,在这个管道中,你可以处理任意的异步逻辑。

而我们要做的就是将这个Epic,整合进刚才的中间中。

做法也很简单,我们只需要将订阅从action$换到新的流上就可以了:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  const newAction$ = epic(action$);
  
  newAction$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

这样,action$在接收到新的action的时候,会流经epic定义的管道,然后才出发dispatch

多个Epic合并

到此,我们的中间件已经有初步处理异步逻辑的能力,但是,在现实中,我们的异步逻辑不可能只有一个,所以epic是会有很多的,而store去订阅的流只能是一个,所以这么多的epic产生的流要合并成一个流。

合并流的操作,强大的RxJs自然是有安排的,相信你想到了操作符merge,我们可以提供一个combineEpics的函数:

export const combineEpics = (...epics) => {
  const merger = (...args) => merge(
    ...epics.map((epic) => {
      const output$ = epic(...args);
      return output$;
    })
  );
  return merger;
};
复制代码

上面的代码不难理解,combineEpics整合了所有传入的epic,然后返回一个merger,这个merger是利用merge操作符,将所有的epic产生的流合并成一个流。

RxJS与Redux结合使用(一):打造自己的redux-observable

代码形式为:

const pingEpic = action$ => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong' })),
);

const getUserEpic = action$ => action$.pipe(
  filter(action => action.type === 'GET_USER'),
  map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);

const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

state获取

在epic中我们不可避免地要借助state里面的数据进行不同的处理,所以我们是需要获取到state的,所以你可以在中间件中的epci执行函数中添加一个参数,将state获取函数暴露出去:

export default (store) => {
  ...
  const newAction$ = rootEpic(action$, store.getState);
  ...
};
复制代码

这样epic里就可以用getState()获取state:

const pingEpic = (action$, getState) => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong', payload: getState() })),
);
复制代码

进一步优化:将state也流化

上面的做法是直接去获取state,这样的做法是主动获取,不符合函数响应式编程模式。函数响应式中,state的改变状态,应该是要能被观察的。

当state也能被响应观察,我们就可以做更多的功能,例如:当state的某些数据在发生变化的时候,我们要去进行实时保存。

在传统模式的做法中,你可以在中间件中这样写:

export default store => next => action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // 类似这样的写法
    if (newState.xxx !== oldState.xxx) {
        fetch('/api/save', {
            method: 'POST',
            body: {
            
            }
        }).then(() => {}).catch(() => {})
    }
    return result;
}
复制代码

这个处理逻辑要独立为一个中间件,而如果你将state也流化,你可以直接使用epic这样处理:

const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
  return action$.pipe(
    filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自动保存的开启
    exhaustMap(() => state$.pipe(
        pluck('xxx'), // 获取state.xxx
        distinctUntilChanged(), // 前后值不同时才将其发出。
        concatMap((value) => {
            // fetch to save
        }),
        // 自动保存的关闭
        takeUntil(action$.pipe(
            filter(action => action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)
复制代码

如果仔细阅读这段代码,可以发现这样的方式可以使用非常优雅的方式控制这个自动保存,可以和action$结合使用,快速开关自动保存,可以利用RxJs的特性解决保存的异步执行延迟问题。

如果你只是单存想要获取最新state,可以使用withLatestFrom操作符:

const countEpic = (action$, state$) => action$.pipe(
  filter(action => action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) => {
    return of({ type: 'whatever' });
  })
);
复制代码

在中间件加入state流:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};
复制代码

注意state 最新值不是默认state。

Action的顺序问题

如果你有耐心看到这里,那么说明你对于redux结合RxJs的使用已经理解得差不多了,但是这里还是有个问题,就是action的生效顺序,我们可以直接看个例子说明,假设有下面这样两个epic:

const epic1 = action$ => action$.pipe(
  filter(action => action.type === 'one'),
  mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);

const epic2 = action$ => action$.pipe(
  filter(action => action.type === 'two'),
  mergeMap(() => of({ type: 'four' })),
);
复制代码

store.dispatch({ type: 'one' }) 的时候,action的顺序为:

'one' -> 'two' -> 'four' -> 'three'
复制代码

可见,action的执行顺序并不是如我们预期的那样,在two触发后就发出了four,这是因为RxJs默认的调度器是同步的,用一段简单的代码,上面的效果类似于:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      p.print();
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
复制代码

换成上面的代码的话你就不陌生了吧,也会对于输出的结果表示肯定,但是我们需要的效果是

'one' -> 'two' -> 'three' -> 'four'
复制代码

这如何做到?

明显,需要将调度器换成其他的,RxJs有这么几种调度器:null(同步)、asap、queue、async、animationFrame。最后一种是动画场景的调度器,直接剔除,默认是第一种,那么就剩下asap、queue、async。在这个场景下,这三种调度器都是可行的,但是queue在大量的数据的时候对于性能是有利的,所以这里可以使用它。不过,记住,这三种调度器是有区别的,大家有兴趣的自己去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延迟为0的时候接近于同步,在延迟不为0的时候与async一样。

中间件中:

const action$ = new Subject().pipe(
    observeOn(queue)
  );
复制代码

这样得到的结果就是:

'one' -> 'two' -> 'three' -> 'four'
复制代码

如果用简单的代码,相当于发生了这样的变化:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      setTimeout(() => p.print(), 0);
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

复制代码

总结

本文就讲到这里,这次介绍了如何自己实现一个redux-observable,下次会讲redux-observable在实战中的一些应用,例如怎么类似dva那样进行模块化开发、如何统一处理loading、error等。


以上所述就是小编给大家介绍的《RxJS与Redux结合使用(一):打造自己的redux-observable》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

HTTP Essentials

HTTP Essentials

Stephen A. Thomas、Stephen Thomas / Wiley / 2001-03-08 / USD 34.99

The first complete reference guide to the essential Web protocol As applications and services converge and Web technologies not only assume HTTP but require developers to manipulate it, it is be......一起来看看 《HTTP Essentials》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具