可观察的Javascript:需要switchMap的功能,但略有不同

我有一个Rxjs可观察对象(下面代码中的流),它发出可观察对象(subjOne和subjTwo).每个内部可观测对象都可以在任何时间,任何顺序发出它们自己的值.我的任务是从subjOne捕获值,直到subjTwo发出第一个值.

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

stream
    .someOperator(subj => subj)
    .subscribe(value => console.log('Value: ', value));

范例1:
subjOne发出值1和2,然后subjTwo发出值3,然后subjOne发出4.
输出应为:1、2、3.

范例2:
subjTwo发出1,然后subjOne发出2.
输出应为1.

switchMap在这里不合适,因为一旦从流中发出subjTwo,它就会从subjOne中删除值.关于如何实现这一目标的任何想法?谢谢.

更新:在我的实际情况下,不仅有两个内部可观察对象-subjOne和subjTwo-而且有一个恒定的流,因此手动对subjOne.takeUntil(subjTwo)进行硬编码不是一个可行的选择.

解决方法:

我认为这可以满足您的需求:

// scan to let us keep combining the previous observable
// with the next observable
source
  .scan((current, next) => {
    // takeUntil to stop current when next produces
    const currentUntil = current.takeUntil(next);
    // now merge current with next
    return currentUntil.merge(next)
  }, Rx.Observable.empty())
  // switch to the most recent version of the combined inner observables
  .switch();

请注意,这仅在内部可观察物很热的情况下才能正常工作.如果它们是冷可观察的,则将需要更多的代码来实现.

上一篇:javascript-Angular 2 Http get-从响应中解析JSON对象


下一篇:Operation之变换操作符