RxJS Subject

栏目: JavaScript · 发布时间: 6年前

内容简介:观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:在观察者模式中也有两个主要角色:Subject(主题)和 Observer (观察者),它们分别对应例子中的期刊出版方和订阅者。

观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

  • 期刊出版方 —— 负责期刊的出版和发行工作。
  • 订阅者 —— 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知。

在观察者模式中也有两个主要角色:Subject(主题)和 Observer (观察者),它们分别对应例子中的期刊出版方和订阅者。

订阅 Observable

在介绍 RxJS Subject 之前,我们先来看个示例:

import { interval } from "rxjs";
import { take } from "rxjs/operators";

const interval$ = interval(1000).pipe(take(3));

interval$.subscribe(value => console.log("Observer A get value: " + value));

setTimeout(() => {
  interval$.subscribe(value => console.log("Observer B get value: " + value));
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  • Observable 对象可以被重复订阅。
  • Observable 对象每次被订阅后,都会重新执行。

上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

function interval() {
  setInterval(() => console.log('..'), 1000);
}

interval();

setTimeout(() => {
  interval();
}, 1000);

Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播。

上述的需求要如何实现呢?我们已经知道了观察者模式定义了一对多的关系,我们可以让多个观察者对象同时监听同一个主题,这里就是我们的时间序列流。当数据源发出新值的时,所有的观察者就能接收到新的值。

自定义 Subject

  1. Subject 类定义
class Subject {
  observers = [];
  addObserver(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(o => o.next(value));
  }

  error(error) {
    this.observers.forEach(o => o.error(error));
  }

  complete() {
    this.observers.forEach(o => o.complete());
  }
}
  1. 使用示例
const interval$ = interval(1000).pipe(take(3));
const subject = new Subject();

let observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

var observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.addObserver(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
  subject.addObserver(observerB); // 添加观察者B
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

RxJS Subject

其实 RxJS 也为我们提供了 Subject 类,接下我们来利用 RxJS 的 Suject 重写一下上面的示例:

import { interval, Subject } from "rxjs";
import { take } from "rxjs/operators";

const interval$ = interval(1000).pipe(take(3));
const subject = new Subject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
  subject.subscribe(observerB); // 添加观察者B
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

根据上述的示例代码和控制台的输出结果,我们来总结一下 Subject 的特点:

  • Subject 既是 Observable 对象,又是 Observer 对象。
  • 当有新消息时,Subject 会通知内部的所有观察者。

RxJS Subject & Observable

Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next —— 每当 Subject 对象接收到新值的时候,next 方法会被调用。
  • error —— 运行中出现异常,error 方法会被调用。
  • complete —— Subject 订阅的 Observable 对象结束后,complete 方法会被调用。
  • subscribe —— 添加观察者。
  • unsubscribe —— 取消订阅(设置终止标识符、清空观察者列表)。

除了 Subject 之外,RxJS 还为我们提供了 Subject 的几种变体,如 BehaviorSubject、ReplaySubject 和 AsyncSubject。下面我们来分别介绍一下它们。

BehaviorSubject

有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。

为了说明上述的情景,我们先来分析一下以下示例:

import { Subject } from "rxjs";

const subject = new Subject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3

通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。

下面我们来使用 BehaviorSubject 重写上面的示例:

import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject(0);

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

通过以上示例,我们知道 BehaviorSubject 会记住最近一次发送的值,当新的观察者进行订阅时,就会接收到最新的值。然后有些时候,我们新增的订阅者,可以接收到数据源最近发送的几个值,针对这种场景,我们就需要使用 ReplaySubject。

ReplaySubject

import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(2);

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3

可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。 在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放

AsyncSubject

AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

subject.complete();

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

最后我们来介绍一下在 Angular 项目中,RxJS Subject 的应用。

Angular RxJS Subject 应用

在 Angular 中,我们可以利用 RxJS Subject 来实现组件间通信,具体示例如下:

  1. message.service.ts
import { Injectable } from '@angular/core';
import { Observable, Subject } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class MessageService {
  private subject = new Subject<any>();

  sendMessage(message: string) {
    this.subject.next({ text: message });
  }

  clearMessage() {
    this.subject.next();
  }

  getMessage(): Observable<any> {
    return this.subject.asObservable();
  }
}
  1. home.component.ts
import { Component } from '@angular/core';

import { MessageService } from '../message.service';

@Component({
  selector: 'app-home',
  template: `
    <div>
      <h1>Home</h1>
      <button (click)="sendMessage()">Send Message</button>
      <button (click)="clearMessage()">Clear Message</button>
    </div>
  
  `
})

export class HomeComponent {
  constructor(private messageService: MessageService) { }

  sendMessage(): void { // 发送消息
    this.messageService.sendMessage('Message from Home Component to App Component!');
  }

  clearMessage(): void { // 清除消息
    this.messageService.clearMessage();
  }
}
  1. app.component.ts
import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

import { MessageService } from './message.service';

@Component({
  selector: 'my-app',
  template: `
      <div *ngIf="message">{{message.text}}</div>
      <app-home></app-home>
    `
})

export class AppComponent implements OnDestroy {
  message: any;
  subscription: Subscription;

  constructor(private messageService: MessageService) {
    this.subscription = this.messageService.getMessage().subscribe(message => {
      this.message = message;
    });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

感兴趣的同学可以查看 Stackblitz 完整示例。

参考资源


以上所述就是小编给大家介绍的《RxJS Subject》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

并发的艺术

并发的艺术

Clay Breshears / 聂雪军 / 机械工业出版社 / 2010年9月 / 49.00元

如果你希望通过并发编程来充分发挥多核处理器的强大功能,那么本书将为你提供所需的理论知识和实际经验。《并发的艺术》是为数不多的几本介绍如何在多核处理器的共享内存模型中实现算法的书籍之一,它并非仅仅介绍一些理论模型或者分布式内存架构。本书详细分析了各种示例程序,这些内容非常有助于你将串行代码转换为并行代码,此外还介绍了如何避免一些常见的错误。 本书的作者是Intel公司的一位资深工程师,他从事并......一起来看看 《并发的艺术》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具