概述
Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
Schedulers (调度器): 用来控制并发并且是*集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。
Observable(可观察对象)
-
定义
Observables 是多个值的惰性推送集合
-
创建 Observables
Rx.Observable.create 是 Observable 构造函数的别名,它接收一个参数:subscribe 函数。
var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
-
订阅 Observables
订阅 Observable 像是调用函数, 并提供接收数据的回调函数。
observable.subscribe(x => console.log(x));
注:
observable.subscribe
Observable.create(function subscribe(observer) {...})
中的 subscribe 有着同样的名字,这并不是一个巧合。
这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的。
- 执行 Observables
Observable.create(function subscribe(observer) {...}) 中...的代码表示 “Observable 执行”
它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:
"Next" 通知: 发送一个值,比如数字、字符串、对象,等等。 -- 最重要
"Error" 通知: 发送一个 JavaScript 错误 或 异常。
"Complete" 通知: 不再发送任何值。
-
清理 Observable 执行
当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。
1.
var observable = Rx.Observable.from([10, 20, 30]);
var subscription1 = observable.subscribe(x => console.log(x));
// 清理
subscription1.unsubscribe();
2.
var observable = Rx.Observable.create(function subscribe(observer) {
// 追踪 interval 资源
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
var subscription2 = observable.subscribe(x => console.log(x));
// 清理
subscription2.unsubscribe();
Observer (观察者)
-
定义
观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete
1.
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);
2.
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。
三种类型的回调函数都可以直接作为参数来提供:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Subscription (订阅)
-
定义
Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe
Subject (主体)
-
定义
-
Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者
-
Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
-
- 每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法。
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
- 每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅
执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
-
多播的 Observables
多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// This is, under the hood, `source.subscribe(subject)`:
// 决定了何时启动共享的 Observable 执行
multicasted.connect();
-
BehaviorSubject
“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
-
ReplaySubject
ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。
import { ReplaySubject } from 'rxjs';
// 为新的订阅者缓冲3个值
// (还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录:new Rx.ReplaySubject(100, 500 /* windowTime */);)
const subject = new ReplaySubject(3);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
-
AsyncSubject
AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
Operators
帮助学习的网站:
https://rxmarbles.com/
https://reactive.how/rxjs/explorer
-
定义
操作符是函数,它基于当前的 Observable 创建一个新的 Observable(接收一个 Observable 作为输入、并生成一个新的 Observable 作为输出)。这是一个无副作用的操作:前面的 Observable 保持不变
实例操作符 vs. 静态操作符
- 实例操作符:Observable 实例上的方法
- 静态操作符:静态操作符是附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。它们只接收非 Observable 参数,比如数字,然后创建一个新的 Observable ,而不是将一个输入 Observable 转换为输出 Observable 。
-
工作方式
-
操作符分类
TODO
调度器
-
定义
调度器控制着何时启动 subscription 和何时发送通知、用来控制并发并且是*集权的调度员,允许我们在发生计算时进行协调。(调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者)
- 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
- 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
- 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
-
调度器类型
调度器 | 目的 |
---|---|
null | 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。 |
Rx.Scheduler.queue | 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。 |
Rx.Scheduler.asap | 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。 |
Rx.Scheduler.async | 使用 setInterval 的调度。用于基于时间的操作符。 |