Angular-Observable和RxJS(未完)

栏目: JavaScript · 发布时间: 7年前

内容简介:无论哪种情况,next 处理器都是RxJS是一个使用可观察对象进行响应式编程的RxJS库提供了一种对Observable类型的实现,这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:
  • 可观察对象 支持在应用中的 发布者订阅者 之间传递消息。
  • 可观察对象是 声明式 的——也就是说, 发布者 中用于发布值的函数,只有在有****消费者订阅**它之后才会执行。
  • 可观察对象可以发送多个 任意类型 的值 —— 字面量、消息、事件。你的应用代码只管 订阅并消费 这些值就可以了,做完之后,取消订阅。无论这个流是 击键流HTTP响应流 还是 定时器 ,对这些值进行监听和停止监听的接口都是一样的。

1.1基本用法和词汇

  • 作为 发布者 ,你 创建 一个 可观察对象 (Observable)的实例,其中定义了一个 订阅者( subscriber )函数 。订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。 订阅者函数 会接收一个 观察者 (observer),并把值 发布观察者 的 next() 方法
  • 当有 消费者 调用 subscribe() 方法时,这个 订阅者函数 就会执行。作为 消费者 ,要 执行 所创建的 可观察对象 ,并开始从中接收通知,你就要 调用可观察对象 的 subscribe() 方法,并传入一个 观察者 (observer)。
  • 观察者 是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。
  • subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。
// 在有消费者订阅它之前,这个订阅者函数并不会实际执行
const locations = new Observable((observer) => {
  const {next, error} = observer;
  let watchId;

  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition(next, error);
  } else {
    error('Geolocation not available');
  }

  return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});

// subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。
// subscribe()传入一个观察者对象,定义了你收到的这些消息的处理器
const locationsSubscription = locations.subscribe({
  next(position) { console.log('Current Position: ', position); },
  error(msg) { console.log('Error Getting Location: ', msg); }
});

// 10 seconds后调用该方法时,你就会停止接收通知。
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
复制代码

1.2定义观察者observer

通知类型 说明
next 必要 。用来处理每个送达值。在开始执行后可能执行零次或多次。
error 可选 。用来处理 错误通知 。错误会中断这个可观察对象实例的执行过程。
complete 可选 。用来处理 执行完 毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

1.3订阅

  • 只有当有人订阅 Observable 的实例时,订阅者函数才会开始发布值。

  • 订阅时要先调用 该实例 的 subscribe() 方法,并把一个 观察者对象 传给subscribe(),用来接收通知。

  • 使用 Observable 上定义的一些 静态方法 来创建一些常用的 简单可观察对象

    • of(...items) —— 返回一个 Observable 实例,它用同步的方式把参数中提供的这些值发送出来。

    • from(iterable) —— 把它的参数转换成一个 Observable 实例。 该方法通常用于把一个数组转换成一个(发送多个值的)可观察对象。

  • 下面的例子会创建并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:

// 创建简单的可观察对象,来发送3个值
const myObservable = of(1, 2, 3);

// 创建观察者对象
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// 订阅
myObservable.subscribe(myObserver);
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

=>前面指定预定义观察者并订阅它,等同如下写法,省略了next,error,complete
myObservable.subscribe(
  // subscribe() 方法可以接收预定义在观察者中同一行的回调函数
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);
复制代码

无论哪种情况,next 处理器都是 必要 的,而 error 和 complete 处理器是可选的。

  • 注意,next() 函数可以接受消息字符串、事件对象、数字值或各种结构。我们把由可观察对象发布出来的数据统称为 任何类型的值 都可以表示为可观察对象,而这些值会被发布为一个流。

1.4创建可观察对象

  • 要创建一个与前面的 of(1, 2, 3) 等价的可观察对象,你可以这样做:
// 订阅者函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。
function sequenceSubscriber(observer) {
  // 同步地 发布 1, 2, and 3, 然后 complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
 
  // 同步发布数据,所以取消订阅 不需要做任何事情
  return {unsubscribe() {}};
}
 
// 使用 Observable 构造函数,创建一个新的可观察对象,
// 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数sequenceSubscriber作为订阅者函数来运行。 
const sequence = new Observable(sequenceSubscriber);
 
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});
 
// Logs:
// 1
// 2
// 3
// Finished sequence
复制代码
  • 下面的例子用来 发布事件 的可观察对象:
function fromEvent(target, eventName) {
    return new Observable(
        // new Observable中传入的订阅者函数是用内联方式定义的
        // 订阅者函数会接收一个 观察者对象observer,并把值e发布给观察者的 next() 方法
        (observer) => {
            const handler = (e) => observer.next(e);

            // Add the event handler to the target
            target.addEventListener(eventName, handler);

            return () => {
                // Detach the event handler from the target
                target.removeEventListener(eventName, handler);
            };
        }

    );
}


const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;

const subscription = fromEvent(nameInput, 'keydown')//使用fromEvent函数来创建可发布 keydown 事件的可观察对象
    .subscribe(
        // subscribe() 方法接收预定义在观察者中同一行的next回调函数
        (e: KeyboardEvent) => {
            if (e.keyCode === ESC_KEY) {
                nameInput.value = '';
            }
        }
    );
复制代码

1.5多播?

1.6错误处理

  • 由于可观察对象会 异步 生成值,所以用 try/catch 是 无法 捕获错误的。你应该在 观察者 中指定一个 error 回调 来处理错误。
  • 发生错误 时还会导致可观察对象 清理 现有的订阅,并且 停止 生成值。
  • 可观察对象可以 生成值 (subscribe()调用 next 回调),也可以调用 complete 或 error 回调来 主动结束
myObservable.subscribe({
  next(num) { console.log('Next num: ' + num)},
  error(err) { console.log('Received an errror: ' + err)}
});
复制代码

二、RxJS 库

RxJS是一个使用可观察对象进行响应式编程的 ,它让组合异步代码和基于回调的代码变得更简单。

RxJS库提供了一种对Observable类型的实现,这个库还提供了一些 工具 函数,用于创建和使用可观察对象。这些工具函数可用于:

  • 把现有的异步代码转换成可观察对象
  • 迭代流中的各个值
  • 把这些值映射成其它类型
  • 对流进行过滤
  • 组合多个流

2.1创建可观察对象的函数

RxJS 提供了一些用来创建可观察对象的函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如承诺、定时器、事件、ajax等等。

  • 承诺
import { fromPromise } from 'rxjs';

// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
 next(response) { console.log(response); },
 error(err) { console.error('Error: ' + err); },
 complete() { console.log('Completed'); }
});
复制代码
  • 定时器
import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
  console.log(`It's been ${n} seconds since subscribing!`));
复制代码
  • 事件
import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});
复制代码
  • ajax
import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
复制代码

2.2常用操作符

操作符会 观察 来源 可观察对象 中发出的 转换 它们,并 返回 由转换后的值组成的新的可观察对象。

  • 提倡使用 管道 来组合操作符,而不是使用链式写法
import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5) // 可观察对象
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
复制代码
类别 操作
创建 from, fromPromise,fromEvent, of
组合 combineLatest, concat, merge, startWith , withLatestFrom, zip
过滤 debounceTime, distinctUntilChanged, filter, take, takeUntil
转换 bufferTime, concatMap, map, mergeMap, scan, switchMap
工具 tap
多播 share

2.3错误处理

除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。 下面是使用 catchError 操作符实现这种效果的例子:

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  //如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
复制代码

2.4重试失败的可观察对象

可以在 catchError 之前使用 retry 操作符。 下列代码为前面的例子加上了捕获错误前重发请求的逻辑:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
复制代码

2.5可观察对象的命名约定

习惯的可观察对象的名字以“$”符号结尾。

stopwatchValue$: Observable<number>;
复制代码

三、Angular 中的可观察对象

Angular 使用可观察对象作为处理各种常用异步操作的接口。比如:

EventEmitter 类派生自 Observable。

HTTP 模块使用可观察对象来处理 AJAX 请求和响应。

路由器和表单模块使用可观察对象来监听对用户输入事件的响应。

3.1事件发送器 EventEmitter

Angular 提供了一个 EventEmitter 类,它用来从组件的 @Output() 属性中发布一些值。EventEmitter 扩展了 Observable,并添加了一个 emit() 方法,这样它就可以发送任意值了。当你调用 emit() 时,就会把所发送的值传给订阅上来的观察者的 next() 方法。

@Output() open = new EventEmitter<any>();
toggle() {
this.open.emit(null);
}
复制代码

3.2HTTP

Angular 的 HttpClient 从 HTTP 方法调用中返回了可观察对象。例如,http.get(‘/api’) 就会返回可观察对象。相对于基于承诺(Promise)的 HTTP API,它有一系列优点:

  • 可观察对象不会修改服务器的响应(和在承诺上串联起来的 .then() 调用一样)。反之,你可以使用一系列操作符来按需转换这些值。
  • HTTP 请求是可以通过 unsubscribe() 方法来取消的。
  • 请求可以进行配置,以获取进度事件的变化。
  • 失败的请求很容易重试。

3.3Async 管道

AsyncPipe 会订阅一个可观察对象或承诺,并返回其发出的最后一个值。当发出新值时,该管道就会把这个组件标记为需要进行变更检查的

3.4路由器 (router)

  • Router.events 以可观察对象的形式提供了其事件。 你可以使用 RxJS 中的 filter() 操作符来找到感兴趣的事件,并且订阅它们,以便根据浏览过程中产生的事件序列作出决定。 例子如下:
import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {

  navStart: Observable<NavigationStart>;

  constructor(private router: Router) {
    // Create a new Observable the publishes only the NavigationStart event
    this.navStart = router.events.pipe(
      filter(evt => evt instanceof NavigationStart)
    ) as Observable<NavigationStart>;
  }

  ngOnInit() {
    this.navStart.subscribe(evt => console.log('Navigation Started!'));
  }
}
复制代码
  • ActivatedRoute 是一个可注入的路由器服务,它使用可观察对象来获取关于路由路径和路由参数的信息。比如,ActivateRoute.url 包含一个用于汇报路由路径的可观察对象。例子如下:
import { ActivatedRoute } from '@angular/router';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable2Component implements OnInit {
  constructor(private activatedRoute: ActivatedRoute) {}

  ngOnInit() {
    this.activatedRoute.url
      .subscribe(url => console.log('The URL changed to: ' + url));
  }
}
复制代码

3.5响应式表单 (reactive forms)

响应式表单具有一些属性,它们使用可观察对象来监听表单控件的值。 FormControl 的 valueChanges 属性和 statusChanges 属性包含了会发出变更事件的可观察对象。订阅可观察的表单控件属性是在组件类中触发应用逻辑的途径之一。比如:

import { FormGroup } from '@angular/forms';

@Component({
  selector: 'my-component',
  template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
  nameChangeLog: string[] = [];
  heroForm: FormGroup;

  ngOnInit() {
    this.logNameChange();
  }
  logNameChange() {
    const nameControl = this.heroForm.get('name');
    nameControl.valueChanges.forEach(
      (value: string) => this.nameChangeLog.push(value)
    );
  }
}
复制代码

四、可观察对象与其它技术的比较

4.1可观察对象 vs. 承诺

可观察对象 承诺 Observable优势
可观察对象是声明式的,在被订阅之前,它不会开始执行。 承诺是在创建时就立即执行的。 这让可观察对象可用于定义那些应该按需执行的菜谱。
可观察对象能提供多个值。 承诺只提供一个。 这让可观察对象可用于随着时间的推移获取多个值。
可观察对象会区分串联处理和订阅语句。 承诺只有 .then() 语句。 这让可观察对象可用于创建供系统的其它部分使用而不希望立即执行的复杂菜谱。
可观察对象的 subscribe() 会负责处理错误。 承诺会把错误推送给它的子承诺。 这让可观察对象可用于进行集中式、可预测的错误处理。

4.2创建与订阅

  • 在有消费者订阅之前,可观察对象不会执行。subscribe() 会执行一次定义好的行为,并且可以再次调用它。每次订阅都是单独计算的。重新订阅会导致重新计算这些值。
content_copy
// declare a publishing operation
new Observable((observer) => { subscriber_fn });
// initiate execution
observable.subscribe(() => {
      // observer handles notifications
    });
复制代码
  • 承诺会立即执行,并且只执行一次。当承诺创建时,会立即计算出结果。没有办法重新做一次。所有的 then 语句(订阅)都会共享同一次计算。
content_copy
// initiate execution
new Promise((resolve, reject) => { executer_fn });
// handle return value
promise.then((value) => {
      // handle result here
    });
复制代码

4.3串联

  • 可观察对象会区分各种转换函数,比如映射和订阅。只有订阅才会激活订阅者函数,以开始计算那些值。
content_copy
observable.map((v) => 2*v);
复制代码
  • 承诺并不区分最后的 .then() 语句(等价于订阅)和中间的 .then() 语句(等价于映射)。
content_copy
promise.then((v) => 2*v);
复制代码

4.4可取消

  • 可观察对象的订阅是可取消的。取消订阅会移除监听器,使其不再接受将来的值,并通知订阅者函数取消正在进行的工作。
content_copy
const sub = obs.subscribe(...);
sub.unsubscribe();
复制代码
  • 承诺是不可取消的。

4.5错误处理

  • 可观察对象的错误处理是交给订阅者的错误处理器的,并且该订阅者会自动取消对这个可观察对象的订阅。
content_copy
obs.subscribe(() => {
  throw Error('my error');
});
复制代码
  • 承诺会把错误推给其子承诺。
content_copy
promise.then(() => {
      throw Error('my error');
});
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ajax for Web Application Developers

Ajax for Web Application Developers

Kris Hadlock / Sams / 2006-10-30 / GBP 32.99

Book Description Reusable components and patterns for Ajax-driven applications Ajax is one of the latest and greatest ways to improve users’ online experience and create new and innovative web f......一起来看看 《Ajax for Web Application Developers》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试