javascript-RxJS运算符,它在事件流中等待静默期,但是在繁忙的事件流中永远不会等待

场景:

>我有一个事件流,每个事件应导致信息的更新显示(事件流来自websocket,并且显示在高图图表中,但这并不重要)
>出于性能原因,我不想为每个事件触发UI更新.
>我希望执行以下操作:

>收到事件后,我只想进行UI更新,距离自上一次更新以来已超过X毫秒
>但是,如果有任何传入事件,我想每隔Y毫秒(Y> X)进行一次更新
>因此,我正在寻找某种(组合)RxJS运算符,该运算符将对事件流进行速率限制,以仅在出现静默期(或已超出等待时间的最大时间)时才发出事件).
>即我想等待安静的时期,但永远不会.

我该如何实现以上所述?

我看了看:

> https://rxjs-dev.firebaseapp.com/api/operators/sampleTime
> https://rxjs-dev.firebaseapp.com/api/operators/debounceTime
> …以及其他一些rxjs时间/速率限制运算符

解决方法:

您可以编写一个运算符,通过使用反跳功能并在可观察的通知程序组成中使用两个计时器来执行所需的操作:

>一个计时器,该计时器在源发出值之后发出X毫秒;和
>计时器,该计时器在运算符返回的可观察值发出一个值后发出Y毫秒.

请参见下面的代码段.其中的注释应说明其工作原理.

const {
  ConnectableObservable,
  merge,
  MonoTypeOperatorFunction,
  Observable,
  of,
  Subject,
  Subscription,
  timer
} = rxjs;

const {
  concatMap,
  debounce,
  mapTo,
  publish,
  startWith,
  switchMap
} = rxjs.operators;

// The pipeable operator:

function waitUntilQuietButNotTooLong(
  quietDuration,
  tooLongDuration
) {

  return source => new Observable(observer => {

    let tooLongTimer;
    
    // Debounce the source using a notifier that emits after `quietDuration`
    // milliseconds since the last source emission or `tooLongDuration`
    // milliseconds since the observable returned by the operator last
    // emitted.

    const debounced = source.pipe(
      debounce(() => merge(
        timer(quietDuration),
        tooLongTimer
      ))
    );

    // Each time the source emits, `debounce` will subscribe to the notifier.
    // Use `publish` to create a `ConnectableObservable` so that the too-long
    // timer will continue independently of the subscription from `debounce`
    // implementation.

    tooLongTimer = debounced.pipe(
      startWith(undefined),
      switchMap(() => timer(tooLongDuration)),
      publish()
    );

    // Connect the `tooLongTimer` observable and subscribe the observer to
    // the `debounced` observable. Compose a subscription so that
    // unsubscribing from the observable returned by the operator will
    // disconnect from `tooLongTimer` and unsubscribe from `debounced`.

    const subscription = new Subscription();
    subscription.add(tooLongTimer.connect());
    subscription.add(debounced.subscribe(observer));
    return subscription;
  });
}

// For a harness, create a subject and apply the operator:

const since = Date.now();
const source = new Subject();
source.pipe(
  waitUntilQuietButNotTooLong(100, 500)
).subscribe(value => console.log(`received ${value} @ ${Date.now() - since} ms`));

// And create an observable that emits at a particular time and subscribe
// the subject to it:

const emissions = of(0, 50, 100, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, 1050, 1100, 1150);
emissions.pipe(
  concatMap((value, index) => timer(new Date(since + value)).pipe(
    mapTo(index)
  ))
).subscribe(source);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
上一篇:27 多线程(一)——创建进程的三种方法、线程锁(同步synchornized与lock)


下一篇:Angular RxJs:针对异步数据流编程工具