RxJS中的多播

目标

  1. 什么是多播
  2. Hot和Cold 数据流差异
  3. 理解RxJS中的Subject
  4. 支持多播的操作符
  5. 了解高级多播功能

1. 什么是多播

在RxJS中,Observable和Observer的关系,就是前者在播放内容,后者
在收听内容。对于多播的可以理解为:一个数据流的内容被多个Observer订阅。

RxJS中的多播

RxJS是⽀持⼀个Observable被多次subscribe的,
所以,RxJS⽀持多播,但是,表⾯上看到的是多播,实质上还是单播。

// 多播
const tick$ = Observable.interval(1000).pipe(take(3));
tick$.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
tick$.subscribe(value => console.log('observer 2: ' + value));
}, 2000);

运行结果:

observer 1: 0
observer 1: 1
observer 2: 0
observer 1: 2
observer 2: 1
observer 2: 2

??? 为啥是这个结果 因为 interval 这个操作符产⽣的是⼀个Cold Observable对象

2. Hot和Cold数据流差异

  1. 所谓Cold Observable,就是每次被subscribe都产⽣⼀个全新的数据序列的数据流。比如 interval, range

  2. Hot Observable的数据流都在外部, 或者来自 promise、DOM、 EventEmitter。

真正的多播,必定是⽆论有多少Observer来subscribe,推给Observer的
都是⼀样的数据源,满⾜这种条件的,就是Hot Observable,因为Hot
Observable中的内容创建和订阅者⽆关。

RxJS中的多播

3. subject

在函数式编程的世界中,有一个要求,是保持不可变性。(Immutable)。所以, 把Cold Observable变成Hot Observable,不是改变Cold Observable本身, 而是产生一个新的Observable来包装之前的Cold Observable。
这样,新的 Observable就成了下游,想要hot数据的observer要订阅这个新的Observable,就OK了。

那么这个产生一个新的Observable来包装之前的Cold Observable这个功能就有subject这个中间人来实现,所以subject具有以下这些功能:

  1. 提供subscribe方法,让其他人能够订阅自己的数据流。 (Observable)
  2. 接受推送的数据,包括 cold Observable 推送的数据 (observer)
 import {Subject} from 'rxjs';

 const subject = new Subject();

 subject.subscribe({
     next:(v)=>console.log(`A:${v}`)
 })

 subject.subscribe({
    next:(v)=>console.log(`B:${v}`)
})

subject.next(1);
subject.next(2);
subject.complete();

用subject实现多播和使用

const tick$ = Observable.interval(1000).take(3);
const subject = new Subject();
tick$.subscribe(subject);

subject.subscribe(value => console.log('observer 1: ' + value));

setTimeout(() => {
subject.subscribe(value => console.log('observer 2: ' + value));
}, 1500);

makeHot操作符

Observable.prototype.makeHot = function () {
const cold$ = this;
const subject = new Subject();
cold$.subscribe(subject);
return Observable.create((observer) => subject.subscribe(observer));
}


const hotTick$ = Observable.interval(1000).take(3).makeHot();
hotTick$.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
hotTick$.subscribe(value => console.log('observer 2: ' + value));
}, 1500);

其他使用 Subject注意点:

  1. Subject不能重复使⽤
  2. Subject可以有多个上游
  3. Subject的错误处理

4.支持多播的操作符

  1. multicast : 实例操作符,能够以上游的Observable为数据源产⽣⼀个新的Hot Observable对象。
// multicast操作符:多播的实现。 需要开启 multicasted.connect();
const source = from ([1,2,3]);
const subject1 = new Subject();

const multicasted = source.pipe(multicast(subject1));

multicasted.subscribe({
    next:(v)=>console.log(`A:${v}`)
})
multicasted.subscribe({
    next:(v)=>console.log(`B:${v}`)
})

multicasted.connect();
  1. share
function shareSubjectFactory() {
    return new Subject();
}
function share() {
    return multicast.call(this, shareSubjectFactory).refCount();
}
  1. publish
function publish(selector) {
    if (selector) {
        return this.multicast(() => new Subject(), selector);
    } else {
        return this.multicast(new Subject());
    }
// publish相当于封装了multicast和创建⼀个新Subject对象这两个动作,
// 让代码更加简洁,最终返回的是⼀个ConnectableObservable对象
multicast(new Subject())

5.⾼级多播功能

  1. publishLast
  2. publishReplay
  3. publishBehavior
上一篇:Yii2与layuiadmin整合2


下一篇:2022-2028全球袋膜行业调研及趋势分析报告