我是反应式编程世界的新手,我正在尝试使用rxjava 2创建一个简单的背压感知消息处理.
以下是我想要实现的工作流程:
>可循环继续字符串流.
>执行耗时的操作并将消息更改为另一个字符串
>执行另一个耗时的操作.
现在我使用以下代码:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
现在对于小型操作,我没有看到任何背压相关的问题.
但是对于大流,我不知道它会如何表现.
现在我的问题是: –
> BackpressureStrategy.BUFFER的默认缓冲区大小是什么?数据在哪里缓冲?
>如果我想在每次耗费任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer运算符怎么办?
>如果缓冲区已满,我不想丢失数据,我想在这种情况下等待什么?
解决方法:
回答你的问题:
1.默认缓冲区大小因平台而异.在JVM上,每个环缓冲区有128个项目,在Android上它是16个项目(Source)
这已从之前的1024降低(您可以看到在RxJava here中实现的更改).还有一个系统属性,您可以根据需要自行调整它:
System.setProperty("rx.ring-buffer.size", "8");
因为它们被称为环形缓冲区,所以它们存储在内存中.你可以阅读更多关于他们here.
2.& 3.如果它满了,它就会开始覆盖它自己.在这种情况下,请使用onBackpressureBuffer
A consequence of the circular buffer is that when it is full and a subsequent write is performed, then it starts overwriting the oldest data.
引用维基文章关于Circular buffer.
当你知道你的rx.ring-buffer.size时,你可以做的最好的事情是使用RxJava 2中给出的以下API:
onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
Action0 onOverflow, // The desired action to execute
BackpressureOverflow.Strategy strategy) // The desired strategy to use
再说一遍,因为我不能说得更好,让我在Backpressure (2.0)引用RxJava wiki:
The BackpressureOverflow.Strategy is an interface actually but the class BackpressureOverflow offers 4 static fields with implementations of it representing typical actions:
ON_OVERFLOW_ERROR
: this is the default behavior of the previous two overloads, signalling a BufferOverflowExceptionON_OVERFLOW_DEFAULT
: currently it is the same as ON_OVERFLOW_ERRORON_OVERFLOW_DROP_LATEST
: if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.ON_OVERFLOW_DROP_OLDEST
: drops the oldest element in the buffer and adds the current value to it.Note that the last two strategies cause discontinuity in the stream as they drop out elements. In addition, they won’t signal BufferOverflowException.
这是一个例子:
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
值得注意的是:
The
Observable
type in RxJava 2.x has no concept of backpressure. ImplementingObservable
is effectively the same as usingonBackpressureBuffer()
by default. UI events, one-off network requests, and state changes should all work with this approach. TheCompletable
,Maybe
, andSingle
types can also dictate this behavior.If you need to support backpressure, RxJava 2.x’s new class,
Flowable
, is backpressure-aware likeObservable
was in RxJava 1.x. However, the updated library now requires an explicit choice of a backpressure strategy to prevent surpriseMissingBackpressureExceptions
.
阅读更多:
> src/main/java/rx/internal/util/RxRingBuffer.java#L246