在介绍 Observable 之前,我们要先了解两个设计模式:
Observer Pattern - (观察者模式)
Iterator Pattern - (迭代器模式)
这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。
Observer Pattern
观察者模式定义
观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。 — *
观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。
我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:
期刊出版方 - 负责期刊的出版和发行工作
订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知
在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。
观察者模式优缺点
观察者模式的优点:
支持简单的广播通信,自动通知所有已经订阅过的对象
目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用
观察者模式的缺点:
如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃
观察者模式的应用
在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:
<button id="btn">确认</button>
function clickHandler(event) {
console.log('用户已点击确认按钮!');
}
document.getElementById("btn").addEventListener('click', clickHandler);
上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler
函数。
观察者模式实战
Subject 类定义:
class Subject {
constructor() {
this.observerCollection = [];
}
registerObserver(observer) {
this.observerCollection.push(observer);
}
unregisterObserver(observer) {
let index = this.observerCollection.indexOf(observer);
if(index >= 0) this.observerCollection.splice(index, 1);
}
notifyObservers() {
this.observerCollection.forEach((observer)=>observer.notify());
}
}
Observer 类定义:
class Observer {
constructor(name) {
this.name = name;
}
notify() {
console.log(`${this.name} has been notified.`);
}
}
使用示例:
let subject = new Subject(); // 创建主题对象
let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'
subject.registerObserver(observer1); // 注册观察者A
subject.registerObserver(observer2); // 注册观察者B
subject.notifyObservers(); // 通知观察者
subject.unregisterObserver(observer1); // 移除观察者A
subject.notifyObservers(); // 验证是否成功移除
以上代码成功运行后控制台的输出结果:
semlinker has been notified. # 输出一次
2(unknown) lolo has been notified. # 输出两次
需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)
Iterator Pattern
迭代器模式定义
迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。
迭代器模式的优缺点
迭代器模式的优点:
简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。
迭代器模式的缺点:
遍历过程是一个单向且不可逆的遍历
ECMAScript 迭代器
在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,就是 Iterator 在没有元素之后,执行
next
会直接抛出错误;但后来经过一段时间讨论后,决定采更 functional 的做法,改成在取得最后一个元素之后执行next
永远都回传{ done: true, value: undefined }
一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 done
和 value
两个属性的对象。对象的取值如下:
在最后一个元素前:
{ done: false, value: elementValue }
在最后一个元素后:
{ done: true, value: undefined }
详细信息可以参考 - 可迭代协议和迭代器协议
ES 5 迭代器
接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:
function makeIterator(array){
var nextIndex = 0;
return {
next: function(){
return nextIndex < array.length ?
{value: array[nextIndex++], done: false} :
{done: true};
}
}
}
一旦初始化, next() 方法可以用来依次访问可迭代对象中的元素:
var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done); // true
ES 6 迭代器
在 ES 6 中我们可以通过 Symbol.iterator
来创建可迭代对象的内部迭代器,具体示例如下:
let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();
调用 next()
方法来获取数组中的元素:
> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }
ES 6 中可迭代的对象:
Arrays
Strings
Maps
Sets
DOM data structures (work in progress)
Observable
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。
Proposal Observable
-
Proposal Observable Implementations
自定义 Observable
如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer
作为参数然后返回另一个函数。
它的基本特征:
是一个函数
接受一个
Observer
对象 (包含 next、error、complete 方法的对象) 作为参数返回一个
unsubscribe
函数,用于取消订阅
它的作用:
作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:
DataSource - 数据源
class DataSource {
constructor() {
let i = 0;
this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
}
emit(n) {
const limit = 10; // 设置数据上限值
if (this.ondata) {
this.ondata(n);
}
if (n === limit) {
if (this.oncomplete) {
this.oncomplete();
}
this.destroy();
}
}
destroy() { // 清除定时器
clearInterval(this._id);
}
}
myObservable
function myObservable(observer) {
let datasource = new DataSource(); // 创建数据源
datasource.ondata = (e) => observer.next(e); // 处理数据流
datasource.onerror = (err) => observer.error(err); // 处理异常
datasource.oncomplete = () => observer.complete(); // 处理数据流终止
return () => { // 返回一个函数用于,销毁数据源
datasource.destroy();
};
}
使用示例:
const unsub = myObservable({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
/**
* 移除注释,可以测试取消订阅
*/
// setTimeout(unsub, 500);
具体运行结果,可以查看线上示例。
SafeObserver - 更好的 Observer
上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:
传入的
Observer
对象可以不实现所有规定的方法 (next、error、complete 方法)在
complete
或者error
触发之后再调用next
方法是没用的调用
unsubscribe
方法后,任何方法都不能再被调用了complete
和error
触发后,unsubscribe
也会自动调用当
next
、complete
和error
出现异常时,unsubscribe
也会自动调用以保证资源不会浪费next
、complete
和error
是可选的。按需处理即可,不必全部处理
为了完成上述目标,我们得把传入的匿名 Observer
对象封装在一个 SafeObserver
里以提供上述保障。SafeObserver 的具体实现如下:
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
// 尚未取消订阅,且包含next方法
if (!this.isUnsubscribed && this.destination.next) {
try {
this.destination.next(value);
} catch (err) {
// 出现异常时,取消订阅释放资源,再抛出异常
this.unsubscribe();
throw err;
}
}
}
error(err) {
// 尚未取消订阅,且包含error方法
if (!this.isUnsubscribed && this.destination.error) {
try {
this.destination.error(err);
} catch (e2) {
// 出现异常时,取消订阅释放资源,再抛出异常
this.unsubscribe();
throw e2;
}
this.unsubscribe();
}
}
complete() {
// 尚未取消订阅,且包含complete方法
if (!this.isUnsubscribed && this.destination.complete) {
try {
this.destination.complete();
} catch (err) {
// 出现异常时,取消订阅释放资源,再抛出异常
this.unsubscribe();
throw err;
}
this.unsubscribe();
}
}
unsubscribe() { // 用于取消订阅
this.isUnsubscribed = true;
if (this.unsub) {
this.unsub();
}
}
}
myObservable - 使用 SafeObserver
function myObservable(observer) {
const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象
const datasource = new DataSource(); // 创建数据源
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();
safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法
datasource.destroy();
};
// 绑定this上下文,并返回unsubscribe方法
return safeObserver.unsubscribe.bind(safeObserver);
}
使用示例:
const unsub = myObservable({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
具体运行结果,可以查看线上示例。
Operators - 也是函数
Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符:
Observable 实现:
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
safeObserver.unsub = this._subscribe(safeObserver);
return safeObserver.unsubscribe.bind(safeObserver);
}
}
map 操作符实现:
function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}
具体运行结果,可以查看线上示例。
改进 Observable - 支持 Operator 链式调用
如果把 Operator
都写成如上那种独立的函数,我们链式代码会逐渐变丑:
map(map(myObservable, (x) => x + 1), (x) => x + 2);
对于上面的代码,想象一下有 5、6 个嵌套着的 Operator
,再加上更多、更复杂的参数,基本上就没法儿看了。
你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的Operator
并生成一个最终的Observable
,不过这意味着要写更复杂的 Operator
,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:
pipe(myObservable, map(x => x + 1), map(x => x + 2));
理想情况下,我们想将代码用更自然的方式链起来:
myObservable.map(x => x + 1).map(x => x + 2);
幸运的是,我们已经有了这样一个 Observable
类,我们可以基于 prototype 在不增加复杂度的情况下支持多 Operators
的链式结构,下面我们采用prototype方式再次实现一下 Observable
:
Observable.prototype.map = function (project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return this.subscribe(mapObserver);
});
};
现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 Observable
类,然后在子类中重写某些内容以优化程序。
接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。
Rx.Observable.create
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
observer.next('Lolo');
});
// 订阅这个 Observable
observable.subscribe(function(value) {
console.log(value);
});
以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。
需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
observer.next('Lolo');
});
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
以上代码运行后,控制台的输出结果:
start
Semlinker
Lolo
end
当然我们也可以用它处理异步行为:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
observer.next('Lolo');
setTimeout(() => {
observer.next('RxJS Observable');
}, 300);
})
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
以上代码运行后,控制台的输出结果:
start
Semlinker
Lolo
end
RxJS Observable
从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。
Observable - Creation Operator
RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:
create
of
from
fromEvent
fromPromise
empty
never
throw
interval
timer
上面的例子中,我们已经使用过了 create 操作符,接下来我们来看一下其它的操作符:
of
var source = Rx.Observable.of('Semlinker', 'Lolo');
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
以上代码运行后,控制台的输出结果:
Semlinker
Lolo
complete!
from
var arr = [1, 2, 3];
var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路"
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
以上代码运行后,控制台的输出结果:
1
2
3
complete!
fromEvent
Rx.Observable.fromEvent(document.querySelector('button'), 'click');
fromPromise
var source = Rx.Observable
.fromPromise(new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Hello RxJS!');
},3000)
}));
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
以上代码运行后,控制台的输出结果:
Hello RxJS!
complete!
empty
var source = Rx.Observable.empty();
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
以上代码运行后,控制台的输出结果:
complete!
empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 complete 信息。
never
var source = Rx.Observable.never();
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象。
throw
var source = Rx.Observable.throw('Oop!');
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
以上代码运行后,控制台的输出结果:
Throw Error: Oop!
throw 操作如,只做一件事就是抛出异常。
interval
var source = Rx.Observable.interval(1000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
以上代码运行后,控制台的输出结果:
0
1
2
...
interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。
timer
var source = Rx.Observable.timer(1000, 5000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
以上代码运行后,控制台的输出结果:
0 # 1s后
1 # 5s后
2 # 5s后
...
timer 操作符支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 操作符也可以只传递一个参数,具体如下:
var source = Rx.Observable.timer(1000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
以上代码运行后,控制台的输出结果:
0
complete!
Subscription
有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription
对象的 unsubscribe
方法来释放资源。具体示例如下:
var source = Rx.Observable.timer(1000, 1000);
// 取得subscription对象
var subscription = source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
RxJS - Observer
Observer (观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。
interface Observer<T> {
closed?: boolean; // 标识是否已经取消对Observable对象的订阅
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Observer 中的三个方法的作用:
next - 每当 Observable 发送新值的时候,next 方法会被调用
error - 当 Observable 内发生错误时,error 方法就会被调用
complete - 当 Observable 数据终止后,complete 方法会被调用。在调用 complete 方法之后,next 方法就不会再次被调用
接下来我们来看个具体示例:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker');
observer.next('Lolo');
observer.complete();
observer.next('not work');
});
// 创建一个观察者
var observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error);
},
complete: function() {
console.log('complete');
}
}
// 订阅已创建的observable对象
observable.subscribe(observer);
以上代码运行后,控制台的输出结果:
Semlinker
Lolo
complete
上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work
。
另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:
var observer = {
next: function(value) {
console.log(value);
}
};
有时候 Observable 可能是一个无限的序列,例如 click 事件,对于这种场景,complete 方法就永远不会被调用。
我们也可以在调用 Observable 对象的 subscribe
方法时,依次传入 next、error、complete 三个函数,来创建观察者:
observable.subscribe(
value => { console.log(value); },
error => { console.log('Error: ', error); },
() => { console.log('complete'); }
);
Pull vs Push
Pull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。
什么是Pull?
在 "拉" 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。
每一个 JavaScript 函数都是一个 "拉" 体系,函数是数据的生产者,调用函数的代码通过 ''拉出" 一个单一的返回值来消费该数据。
const add = (a, b) => a + b;
let sum = add(3, 4);
ES6介绍了 iterator迭代器 和 Generator生成器 — 另一种 "拉" 体系,调用 iterator.next()
的代码是消费者,可从中拉取多个值。
什么是Push?
在 "推" 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。
Promise(承诺) 是当今 JS 中最常见的 "推" 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。
RxJS 引入了 Observables (可观察对象),一个全新的 "推" 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。
生产者 | 消费者 | |
---|---|---|
pull拉 | 被请求的时候产生数据 | 决定何时请求数据 |
push推 | 按自己的节奏生产数据 | 对接收的数据进行处理 |
接下来我们来看张图,从而加深对上面概念的理解:
Observable vs Promise
Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。
MagicQ | 单值 | 多值 |
---|---|---|
拉取(Pull) | 函数 | 遍历器 |
推送(Push) | Promise | Observable |
-
Promise
返回单个值
不可取消的
-
Observable
随着时间的推移发出多个值
可以取消的
支持 map、filter、reduce 等操作符
延迟执行,当订阅的时候才会开始执行
延迟计算 & 渐进式取值
延迟计算
所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。
var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.map(x => x + 1);
上面的示例中,因为 example 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下:
var source = [1,2,3,4,5];
var example = source.map(x => x + 1);
以上代码运行后,example 中就包含已运算后的值。
渐进式取值
数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下:
var source = [1,2,3,4,5];
var example = source
.filter(x => x % 2 === 0) // [2, 4]
.map(x => x + 1) // [3, 5]
关于数组中的 map、filter 的详细信息,可以参考 - RxJS Functional Programming
为了更好地理解数组操作符的运算过程,我们可以参考下图:
虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下:
var source = Rx.Observable.from([1,2,3,4,5]);
var example = source
.filter(x => x % 2 === 0)
.map(x => x + 1)
example.subscribe(console.log);
以上代码的执行过程如下:
source 发出 1,执行 filter 过滤操作,返回 false,该值被过滤掉
source 发出 2,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 3,最后通过 console.log 输出
source 发出 3,执行 filter 过滤操作,返回 false,该值被过滤掉
source 发出 4,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 5,最后通过 console.log 输出
source 发出 5,执行 filter 过滤操作,返回 false,该值被过滤掉
为了更好地理解 Observable 操作符的运算过程,我们可以参考下图: