目标
- 什么是多播
- Hot和Cold 数据流差异
- 理解RxJS中的Subject
- 支持多播的操作符
- 了解高级多播功能
1. 什么是多播
在RxJS中,Observable和Observer的关系,就是前者在播放内容,后者
在收听内容。对于多播的可以理解为:一个数据流的内容被多个Observer订阅。
RxJS是⽀持⼀个Observable被多次subscribe的,
所以,RxJS⽀持多播,但是,表⾯上看到的是多播,实质上还是单播。
// 多播
const tick$ = Observable.interval(1000).pipe(take(3));
tick$.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
tick$.subscribe(value => console.log('observer 2: ' + value));
}, 2000);
运行结果:
observer 1: 0
observer 1: 1
observer 2: 0
observer 1: 2
observer 2: 1
observer 2: 2
??? 为啥是这个结果 因为 interval 这个操作符产⽣的是⼀个Cold Observable对象
2. Hot和Cold数据流差异
-
所谓Cold Observable,就是每次被subscribe都产⽣⼀个全新的数据序列的数据流。比如 interval, range
-
Hot Observable的数据流都在外部, 或者来自 promise、DOM、 EventEmitter。
真正的多播,必定是⽆论有多少Observer来subscribe,推给Observer的
都是⼀样的数据源,满⾜这种条件的,就是Hot Observable,因为Hot
Observable中的内容创建和订阅者⽆关。
3. subject
在函数式编程的世界中,有一个要求,是保持不可变性。(Immutable)。所以, 把Cold Observable变成Hot Observable,不是改变Cold Observable本身, 而是产生一个新的Observable来包装之前的Cold Observable。
这样,新的 Observable就成了下游,想要hot数据的observer要订阅这个新的Observable,就OK了。
那么这个产生一个新的Observable来包装之前的Cold Observable这个功能就有subject这个中间人来实现,所以subject具有以下这些功能:
- 提供subscribe方法,让其他人能够订阅自己的数据流。 (Observable)
- 接受推送的数据,包括 cold Observable 推送的数据 (observer)
import {Subject} from 'rxjs';
const subject = new Subject();
subject.subscribe({
next:(v)=>console.log(`A:${v}`)
})
subject.subscribe({
next:(v)=>console.log(`B:${v}`)
})
subject.next(1);
subject.next(2);
subject.complete();
用subject实现多播和使用
const tick$ = Observable.interval(1000).take(3);
const subject = new Subject();
tick$.subscribe(subject);
subject.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
subject.subscribe(value => console.log('observer 2: ' + value));
}, 1500);
makeHot操作符
Observable.prototype.makeHot = function () {
const cold$ = this;
const subject = new Subject();
cold$.subscribe(subject);
return Observable.create((observer) => subject.subscribe(observer));
}
const hotTick$ = Observable.interval(1000).take(3).makeHot();
hotTick$.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
hotTick$.subscribe(value => console.log('observer 2: ' + value));
}, 1500);
其他使用 Subject注意点:
- Subject不能重复使⽤
- Subject可以有多个上游
- Subject的错误处理
4.支持多播的操作符
- multicast : 实例操作符,能够以上游的Observable为数据源产⽣⼀个新的Hot Observable对象。
// multicast操作符:多播的实现。 需要开启 multicasted.connect();
const source = from ([1,2,3]);
const subject1 = new Subject();
const multicasted = source.pipe(multicast(subject1));
multicasted.subscribe({
next:(v)=>console.log(`A:${v}`)
})
multicasted.subscribe({
next:(v)=>console.log(`B:${v}`)
})
multicasted.connect();
- share
function shareSubjectFactory() {
return new Subject();
}
function share() {
return multicast.call(this, shareSubjectFactory).refCount();
}
- publish
function publish(selector) {
if (selector) {
return this.multicast(() => new Subject(), selector);
} else {
return this.multicast(new Subject());
}
// publish相当于封装了multicast和创建⼀个新Subject对象这两个动作,
// 让代码更加简洁,最终返回的是⼀个ConnectableObservable对象
multicast(new Subject())
5.⾼级多播功能
- publishLast
- publishReplay
- publishBehavior