【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

今天测试人员在测试应用APP的时候应用crash了,查看了下crash log如下所示:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
    at android.os.Handler.handleCallback(Handler.java:751)
    at android.os.Handler.dispatchMessage(Handler.java:95)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6119)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.OnErrorNotImplementedException
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
    at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
    ... 7 more
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
rx.exceptions.OnErrorNotImplementedException
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
    at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
    at android.os.Handler.handleCallback(Handler.java:751)
    at android.os.Handler.dispatchMessage(Handler.java:95)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6119)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)

查看MissingBackpressureException异常类的源代码如下描述:

/**
 * Copyright 2014 Netflix, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.exceptions;

/**
 * Represents an exception that indicates that a Subscriber or operator attempted to apply reactive pull
 * backpressure to an Observable that does not implement it.
 * <p>
 * If an Observable has not been written to support reactive pull backpressure (such support is not a
 * requirement for Observables), you can apply one of the following operators to it, each of which forces a
 * simple form of backpressure behavior:
 * <dl>
 *  <dt><code>onBackpressureBuffer</code></dt>
 *   <dd>maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers
 *       according to the requests they generate</dd>
 *  <dt><code>onBackpressureDrop</code></dt>
 *   <dd>drops emissions from the source Observable unless there is a pending request from a downstream
 *       Subscriber, in which case it will emit enough items to fulfill the request</dd>
 * </dl>
 * If you do not apply either of these operators to an Observable that does not support backpressure, and if
 * either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull
 * backpressure, you will encounter a {@code MissingBackpressureException} which you will be notified of via
 * your {@code onError} callback.
 * <p>
 * There are, however, other options. You can throttle an over-producing Observable with operators like
 * {@code sample}/{@code throttleLast}, {@code throttleFirst}, or {@code throttleWithTimeout}/{@code debounce}.
 * You can also take the large number of items emitted by an over-producing Observable and package them into
 * a smaller set of emissions by using operators like {@code buffer} and {@code window}.
 * <p>
 * For a more complete discussion of the options available to you for dealing with issues related to
 * backpressure and flow control in RxJava, see
 * <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>.
 */
public class MissingBackpressureException extends Exception {

    private static final long serialVersionUID = 7250870679677032194L;

    /**
     * Constructs the exception without any custom message.
     */
    public MissingBackpressureException() {

    }

    /**
     * Constructs the exception with the given customized message.
     * @param message the customized message
     */
    public MissingBackpressureException(String message) {
        super(message);
    }

}

如上面文字描述,抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。

  • onBackpressurebuffer:
    把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
    【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

  • onBackpressureDrop:
    将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。

【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

下面参考了几篇博客之后,理解了Backpressure的概念,并解决了rx.exceptions.MissingBackpressureException异常,读者也可以参考以下几篇博客之后,自己根据自己实际情况来选择不同的解决方式来解决该异常。

参考链接如下:

  1. 关于RxJava最友好的文章——背压(Backpressure)
    https://zhuanlan.zhihu.com/p/24473022?refer=dreawer

  2. RxJava 并发之数据流发射太快如何办(背压(Backpressure))
    http://blog.csdn.net/jdsjlzx/article/details/51868640

  3. RxJava 2.0中backpressure(背压)概念的理解
    http://blog.csdn.net/jdsjlzx/article/details/52717636
  4. RxJava 驯服数据流之 hot & cold Observable
    http://blog.csdn.net/jdsjlzx/article/details/51839090
  5. RxJava 教程第四部分:并发 之数据流发射太快如何办
    http://blog.chengyunfeng.com/?p=981&utm_source=tuicool&utm_medium=referral
  6. http://reactivex.io/RxJava/javadoc/rx/exceptions/MissingBackpressureException.html
  7. Backpressure
    https://github.com/ReactiveX/RxJava/wiki/Backpressure

  8. Java Code Examples for rx.exceptions.MissingBackpressureException
    http://www.programcreek.com/java-api-examples/index.php?api=rx.exceptions.MissingBackpressureException


作者:欧阳鹏 欢迎转载,与人分享是进步的源泉!
转载请保留原文地址: http://blog.csdn.net/ouyang_peng/article/details/66978253

【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

上一篇:再谈java的内存泄露


下一篇:JAVA内存结构详解