我有一个Rxjs可观察对象(下面代码中的流),它发出可观察对象(subjOne和subjTwo).每个内部可观测对象都可以在任何时间,任何顺序发出它们自己的值.我的任务是从subjOne捕获值,直到subjTwo发出第一个值.
const subjOne = new Subject();
const subjTwo = new Subject();
const stream = Observable.create(observer => {
observer.next(subjOne);
observer.next(subjTwo);
});
stream
.someOperator(subj => subj)
.subscribe(value => console.log('Value: ', value));
范例1:
subjOne发出值1和2,然后subjTwo发出值3,然后subjOne发出4.
输出应为:1、2、3.
范例2:
subjTwo发出1,然后subjOne发出2.
输出应为1.
switchMap在这里不合适,因为一旦从流中发出subjTwo,它就会从subjOne中删除值.关于如何实现这一目标的任何想法?谢谢.
更新:在我的实际情况下,不仅有两个内部可观察对象-subjOne和subjTwo-而且有一个恒定的流,因此手动对subjOne.takeUntil(subjTwo)进行硬编码不是一个可行的选择.
解决方法:
我认为这可以满足您的需求:
// scan to let us keep combining the previous observable
// with the next observable
source
.scan((current, next) => {
// takeUntil to stop current when next produces
const currentUntil = current.takeUntil(next);
// now merge current with next
return currentUntil.merge(next)
}, Rx.Observable.empty())
// switch to the most recent version of the combined inner observables
.switch();
请注意,这仅在内部可观察物很热的情况下才能正常工作.如果它们是冷可观察的,则将需要更多的代码来实现.