rxJS

概述

Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
Schedulers (调度器): 用来控制并发并且是*集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

Observable(可观察对象)

  • 定义

    Observables 是多个值的惰性推送集合

  • 创建 Observables

    Rx.Observable.create 是 Observable 构造函数的别名,它接收一个参数:subscribe 函数。

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});
  • 订阅 Observables

    订阅 Observable 像是调用函数, 并提供接收数据的回调函数。

observable.subscribe(x => console.log(x));

注:
   observable.subscribe 
   Observable.create(function subscribe(observer) {...}) 
   中的 subscribe 有着同样的名字,这并不是一个巧合。
   这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的。
  • 执行 Observables
Observable.create(function subscribe(observer) {...}) 中...的代码表示 “Observable 执行”
它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:

"Next"     通知: 发送一个值,比如数字、字符串、对象,等等。 -- 最重要
"Error"    通知: 发送一个 JavaScript 错误 或 异常。
"Complete" 通知: 不再发送任何值。
  • 清理 Observable 执行

    当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

1. 
var observable = Rx.Observable.from([10, 20, 30]);
var subscription1 = observable.subscribe(x => console.log(x));

// 清理
subscription1.unsubscribe();

2.
var observable = Rx.Observable.create(function subscribe(observer) {
  // 追踪 interval 资源
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  // 提供取消和清理 interval 资源的方法
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});
var subscription2 = observable.subscribe(x => console.log(x));

// 清理
subscription2.unsubscribe();

Observer (观察者)

  • 定义

    观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete

1.
var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

2. 
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。

三种类型的回调函数都可以直接作为参数来提供:
observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

Subscription (订阅)

  • 定义

    Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe

Subject (主体)

  • 定义

    1. Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者

    2. Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

  1. 每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法。
var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
  1. 每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。
var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
  • 多播的 Observables

    多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
 
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
 
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
// This is, under the hood, `source.subscribe(subject)`:
// 决定了何时启动共享的 Observable 执行
multicasted.connect(); 
  • BehaviorSubject

    “当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(3);
 
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
  • ReplaySubject

    ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

import { ReplaySubject } from 'rxjs';

// 为新的订阅者缓冲3个值
// (还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录:new Rx.ReplaySubject(100, 500 /* windowTime */);)
const subject = new ReplaySubject(3);
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(5);
 
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
  • AsyncSubject

    AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
 
subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
 
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
 
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});
 
subject.next(5);
subject.complete();
 
// Logs:
// observerA: 5
// observerB: 5

Operators

帮助学习的网站:
https://rxmarbles.com/
https://reactive.how/rxjs/explorer
  • 定义

    操作符是函数,它基于当前的 Observable 创建一个新的 Observable(接收一个 Observable 作为输入、并生成一个新的 Observable 作为输出)。这是一个无副作用的操作:前面的 Observable 保持不变

    实例操作符 vs. 静态操作符

    1. 实例操作符:Observable 实例上的方法
    2. 静态操作符:静态操作符是附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。它们只接收非 Observable 参数,比如数字,然后创建一个新的 Observable ,而不是将一个输入 Observable 转换为输出 Observable 。
  • 工作方式
    rxJS

  • 操作符分类
    TODO

调度器

  • 定义

    调度器控制着何时启动 subscription 和何时发送通知、用来控制并发并且是*集权的调度员,允许我们在发生计算时进行协调。(调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者)

    1. 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
    2. 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
    3. 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
  • 调度器类型

调度器 目的
null 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
Rx.Scheduler.queue 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
Rx.Scheduler.asap 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。
Rx.Scheduler.async 使用 setInterval 的调度。用于基于时间的操作符。
上一篇:飞机飞行仪表行业研究及十四五规划分析报告


下一篇:2021-2027中国L-茶氨酸市场现状研究分析与发展前景预测报告