在我的应用程序中,我有耗时的逻辑,可以通过多种方式启动,让我们自动或由用户手动说.
// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()
// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }
val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one
val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer
启动继电器可以产生许多排放.假设用户单击按钮进行手动启动,剩下5个secons,直到通过计时器自动启动.如果它是简单的flatMap,则两个事件都将导致longOperation()启动.
我只希望一个线程在内部运行longOperation(),所以如果它现在正在运行而没有完成 – 忽略启动发射,无论如何完成将导致计时器重启.
ConcatMap帮我一半 – 它将longOperation()添加到“队列”,所以它们被逐个处理,但是我怎么能写这个以忽略任何进一步的启动,直到第一个完成?
解决方法:
您可以使用带有额外整数参数的flatMap()来限制并行性.
syncStarts
.onBackpressureDrop() // 1
.flatMap(() -> longOperation(), 1) // 2
...
>丢弃flatMap()繁忙时发生的任何排放.
>数字1是flatMap()所做的订阅数,主要是强制操作是连续的.
以上功能是您想要的功能.但是,一旦longOperation()运行,您没有指定您想要发生的事情:您是否希望在紧接着之后立即启动另一个操作?如果是这样,您需要更改背压处理以排队最多一次排放.