java – Rx运算符.忽略直到下一个被发射

在我的应用程序中,我有耗时的逻辑,可以通过多种方式启动,让我们自动或由用户手动说.

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
        .merge(
                autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
                manualStarts
        )
        .subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
        .concatMap {
            longOperation()
        }
        .subscribe(autoStarts) // end of long operation trigger start of auto timer

启动继电器可以产生许多排放.假设用户单击按钮进行手动启动,剩下5个secons,直到通过计时器自动启动.如果它是简单的flatMap,则两个事件都将导致longOperation()启动.
我只希望一个线程在内部运行longOperation(),所以如果它现在正在运行而没有完成 – 忽略启动发射,无论如何完成将导致计时器重启.

ConcatMap帮我一半 – 它将longOperation()添加到“队列”,所以它们被逐个处理,但是我怎么能写这个以忽略任何进一步的启动,直到第一个完成?

解决方法:

您可以使用带有额外整数参数的flatMap()来限制并行性.

syncStarts
  .onBackpressureDrop()               // 1
  .flatMap(() -> longOperation(), 1)  // 2
  ...

>丢弃flatMap()繁忙时发生的任何排放.
>数字1是flatMap()所做的订阅数,主要是强制操作是连续的.

以上功能是您想要的功能.但是,一旦longOperation()运行,您没有指定您想要发生的事情:您是否希望在紧接着之后立即启动另一个操作?如果是这样,您需要更改背压处理以排队最多一次排放.

上一篇:联邦学习 Federated learning Google I/O‘19 笔记


下一篇:学习报告_2020/12/22_part 1