javascript – 使用运算符扩展RxJS Observable类

如何通过应用内置的RxJS运算符来扩展Observable类?

我想做这样的事情:

class TruthyObservable extends Observable {
  constructor(subscriber) {
    super(subscriber);

    return this.filter(x => x);
  }
}

class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {
    super(subscriber);

    return this.map(x => `'${x}'`);
  }
}

没有构造函数返回可以完成吗?

解决方法:

这很大程度上取决于你想要做什么,但是假设你想制作一个TruthyObservable,其行为与默认的Observable.create(…)非常相似,但只传递偶数:

import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';

class TruthyObservable<T> extends Observable<T> {

    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);
            };
        }

        super(subscribe);
    }

    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();

        subject
            .filter((val: number) => val % 2 == 0)
            .subscribe(obs);

        return new Subscriber(subject);
    }

}

let o = new TruthyObservable<number>((obs: Observer<number>) => {
    obs.next(3);
    obs.next(6);
    obs.next(7);
    obs.next(8);
});

o.subscribe(val => console.log(val));

这打印到控制台:

6
8

观看现场演示:https://jsbin.com/recuto/3/edit?js,console

通常继承Observable的类会覆盖实际在内部进行订阅的_subscribe()方法,但在我们的情况下,我们希望使用回调,我们可以自己发出值(因为这个Observable本身不会发出任何内容).方法_subscribe()被_subscribe属性所掩盖,如果它存在,那么如果我们只是覆盖这个方法,我们就无法将任何运算符附加到它.这就是为什么我在构造函数中用另一个函数包装_subscribe然后通过appendOperators()方法中使用filter()链接的Subject传递所有值.请注意,我用obs = this.appendOperators(obs)替换了原始Observer和Subject.

最后,当我打电话给例如. obs.next(3);我实际上是将值推送到主题,过滤它们并将它们传递给原始观察者.

上一篇:javascript – 在RxJS中链接多个flatMap运算符时,如何摆脱Bluebird警告?


下一篇:javascript – 在RxJs中使用TestScheduler测试主题