ReactiveX流式编程—从xstream讲起

栏目: Java · 发布时间: 5年前

内容简介:ReactiveX流式编程xstream的作者也是rxjs的深度用户,但是作者基于一些实践中考虑而开发这个库,作者的解释:WHY WE BUILT XSTREAM

ReactiveX流式编程

ReactiveX 来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

为什么从xstream讲起

xstream的作者也是rxjs的深度用户,但是作者基于一些实践中考虑而开发这个库,作者的解释:WHY WE BUILT XSTREAM

  1. xstream只有26个核心操作符和工厂函数
  2. 只支持 模式流
  3. 只有 streamlistenerproducer 三个概念,比较好理解
  4. 压缩后只有30kb大小,日常开发可以轻松集成代替部分繁琐逻辑

xstream简单上手

import xs from 'xstream'

// Tick every second incremental numbers,
// only pass even numbers, then map them to their square,
// and stop after 5 seconds has passed

var stream = xs.periodic(1000)
  .filter(i => i % 2 === 0)
  .map(i => i * i)
  .endWhen(xs.periodic(5000).take(1))

// So far, the stream is idle.
// As soon as it gets its first listener, it starts executing.

stream.addListener({
  next: i => console.log(i),
  error: err => console.error(err),
  complete: () => console.log('completed'),
})
复制代码

核心概念

Stream

代表从事件发生、处理、监听的一条管道,每个stream都有很多operator类似: map , filter , fold , take

每次调用operator都返回一个新的stream;

一般说来stream中的数据是由producer生产的,但是你可以调用 shamefullySend* 系列函数手动发射事件,但是这种方法是 反reactive 的,作者强烈不推荐使用;按照我的理解,这些方法在多个stream联合工作的,用来mock某些流的数据时候会比较有用

Listener

监听者 ,stream的出口,消费管道最终产物;包含有3个方法

  1. next :stream里每次有管道里产生的数据到流入到这个next方法里接收
  2. error :stream数据流转中有异常情况时调用
  3. complete :生产者调用了stop方法后调用
var listener = {
  next: (value) => {
    console.log('The Stream gave me a value: ', value);
  },
  error: (err) => {
    console.error('The Stream gave me an error: ', err);
  },
  complete: () => {
    console.log('The Stream told me it is done.');
  },
}
复制代码

Producer

生产者,stream的入口,用来持续产生流的输入

var producer = {
  start: function (listener) {
    this.id = setInterval(() => listener.next('yo'), 1000)
  },

  stop: function () {
    clearInterval(this.id)
  },

  id: 0,
}

// This fellow delivers a 'yo' next event every 1 second
var stream = xs.create(producer)
复制代码

MemoryStream

记忆流 普通stream的记忆版:它会记住最后一次发送给listener的next方法的数据,这样后来addListener添加的监听者能收到记住的这个数据; 这个特性是很有用的,能够用来保存应用运行过程中的一些临时状态。

Stream的构造-Factories

create

标准的通过producer构造, create(producer)

createWithMemory

标准的通过producer构造memorystream, createWithMemory(producer)

from

从Array|PromiseLike|Observable创建一个stream

of

从字面量创建一个stream,这样创建的stream会立刻发射所有的参数,并触发completed

fromPromise

从promise创建一个stream

merge

合并两个stream成为一个stream,合并的后的数据按照原本的时间线继续输出(如下图)

ReactiveX流式编程—从xstream讲起

combine

这个单纯用文字不太好解释,请看下图(借用的rxjs里的combineLatest图,功能是类似的)

ReactiveX流式编程—从xstream讲起

另外,rxjs中还有个一个类似的zip操作符(xstream中不存在,自己实现),看下图仔细体会和xstream的combine的不同

ReactiveX流式编程—从xstream讲起

常用的操作符-Operators

map

ReactiveX流式编程—从xstream讲起

mapTo

ReactiveX流式编程—从xstream讲起

filter

ReactiveX流式编程—从xstream讲起

take

ReactiveX流式编程—从xstream讲起

drop

图片借用的rx里的skip,是一样的效果

ReactiveX流式编程—从xstream讲起

fold

图片借用的rx里的scan,是一样的效果

ReactiveX流式编程—从xstream讲起

flatten

这个是操作符就有点复杂了,涉及到了分流的情况,主要功能是将主stream里返回的支流直接打平,输出支流里的数据;整个xstream标准operators(extra下有扩展的)里只有这个操作符有涉及到分流的处理,弹珠(Marble)图如下

ReactiveX流式编程—从xstream讲起

这里解释一下,为什么b输出之后,主流程走到第二个tick,开始输出第二个支流,这是第一个支流的后续输出都会被废弃;

实践一个TODO List

流式思考

假如现在需要我们写一个简单的todolist:有一个 input 和一个 button 当我在input输入内容之后,点击 button 就往todolist集合里添加一条数据,每条todo行前面有个 checkbox 用来勾选todo的完成状态,每条todo行后面有一个 del 按钮,用来删除这条todo

ok,让我们开始之前先用 式的方式思考一下这个问题, 式的方式是基于时间线的演进系统动态变化的一个抽象,那么基于此我们可以很简单抽闲出 3 条时间线:

ReactiveX流式编程—从xstream讲起

基于此,可以很容易写出3条stream的代码如下:

// 工具函数,方便的创建dom事件流
import fromEvent from 'xstream/extra/fromEvent';

// 从添加按钮创建的stream
const addTodoBtn$ = fromEvent(addBtnEl, 'click').map(() => inputEl.value).filter(v => v && v !== '');

// 从删除按钮触发的stream
const delTodoBtn$ = fromEvent(document.body, 'click').map((e: Event) => e.target).filter((target: HTMLElement) => target.classList.contains('delTodo')).map((target: HTMLElement) => parseInt(target.dataset.index));

// 从标记完成选项触发的stream
const toggleTodoInput$ = fromEvent(document.body, 'change').map((e: Event) => e.target).filter((target: HTMLElement) => target.classList.contains('toggleTodo')).map((target: HTMLInputElement) => ({ checked: target.checked, index: parseInt(target.dataset.index) }));
复制代码

好了,现在我们有了3条stream:

button
del
checkbox

保存状态

现在我们有了3条stream,那么该如何将这些stream与dom的操作对应起来呢?同时还有另外一个问题:传统的开发过程中,我们需要有一个外部变量类似state这样用来保存每次操作后最新的todolist数据集合(副作用); 但是ReactiveX提倡的方式就是要消除副作用,我们需要一点儿技巧来处理这个状况;

这里我们思考一下整个操作分两部分:增量数据、减量数据、更新数据, 而减量数据和更新数据都是基于增量数据源的基础上操作的;那么我们需要定义一个增量数据源,并且需要能持续保持这个数据源最后的数据状态。

让我们翻翻上面的operators,看看有哪个操作符是可以用来持久保存局部变量的? 没错,就是 fold 操作符:

// 数据来源
const startUp$ = addTodoBtn$.fold((todos: Todo[], inputValue) => {
  const todo:Todo = { text: inputValue, completed: false };
  todos.push(todo);
  return todos;
}, []);// 初始化空数据列表
复制代码

由于这里的todolist的增量来源只有 button 一个(如果有多个,可以看看  combine 操作符,这里不展开);

分支流和flatten应用

有了增量数据源,那么我们在增量数据源的每个tick上分出一个减量、更新的 支流 (参看上面的flatten操作符),这样支流执行的时候拿到的都是数据源的最新数据;

// 监听数据来源,并触发删除的stream
const delTodos$ = startUp$.map(todos => delTodoBtn$.map(index => {
  console.log(index);
  todos.splice(index, 1);
  return todos;
})).flatten();

// 监听数据来源,并触发选中的stream
const toggleTodos$ = startUp$.map(todos => toggleTodoInput$.map(({ checked, index }) => {
  console.log(checked, index);
  if (todos[index]) {
    todos[index].completed = checked;
  }
  return todos;
})).flatten();
复制代码

组装stream

ok,现在我们的数据来源是多个,输出的都是todolist最新的数据集合状态,让我们把这些stream管道组装起来:

// 组合起来
const todos$ = xs.merge(startUp$, delTodos$, toggleTodos$);

todos$.addListener({
  next: function(todos: Todo[]) {
    console.log(todos);
    renderTodos(todos);
  },
  error: function(e) {
    console.error(e);
  },
  complete: function() {

  }
})

复制代码

副作用

在定义完清晰的stream后,我们的实际业务代码就是这么"简单",由于stream的出口一直都是最新的todolist集合我们实现了类似react的全量渲染;哈哈,实际上这里还有个不怎么简单的副作用方法:

const inputEl = document.getElementById('input') as HTMLInputElement;
const addBtnEl = document.getElementById('addBtn') as HTMLButtonElement;
const todoListEl = document.getElementById('lists');
const initTodos = [];
const renderTodos = function(todos: Todo[]) {
  if (!todos) {
    return;
  }
  todoListEl.innerHTML = '';
  const fragement = document.createDocumentFragment();
  todos.forEach((todo, index) => {
    const liEL = document.createElement('li');
    if (todo.completed) {
      liEL.className = 'completed';
    }
    liEL.innerHTML = `<input type='checkbox' class="toggleTodo" name="toggleTodo" data-index="${index}" ${todo.completed ? 'checked' : null} />
                      <span>${todo.text}</span>
                      <a href='javascript:void(0);' class='delTodo' data-index="${index}">x</a>`;
    fragement.appendChild(liEL);
  });
  todoListEl.appendChild(fragement);
}

复制代码

最终效果

ReactiveX流式编程—从xstream讲起

总结

通过实际例子可以看到这里有一大段不怎么优雅的副作用方法,用来操作dom元素,由于我们基于全量渲染的思想,并没有使用传统的同步增删改dom的方式,否则副作用代码会更多; 同时由于没有vdom的加持,此段渲染代码纯粹只能用来demo展示一下;  至于更优雅的副作用处理和vdom能力,可以期待我后续关于cycle.js的介绍:stuck_out_tongue_winking_eye:


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序员的修炼

程序员的修炼

Jeff Atwood / 陆其明、杨溢 / 人民邮电出版社 / 2014-4 / 45.00元

《程序员的修炼——从优秀到卓越》是《高效能程序员的修炼》的姊妹篇,包含了Coding Horror博客中的精华文章。全书分为8章,涵盖了时间管理、编程方法、Web设计、测试、用户需求、互联网、游戏编程以及技术阅读等方面的话题。作者选取的话题,无一不是程序员职业生涯中的痛点。很多文章在博客和网络上的点击率和回帖率居高不下。 Jeff Atwood于2004年创办Coding Horror博客(......一起来看看 《程序员的修炼》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器