今天测试人员在测试应用APP的时候应用crash了,查看了下crash log如下所示:
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6119)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.OnErrorNotImplementedException
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
... 7 more
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
at rx.Scheduler$Worker$1.call(Scheduler.java:134)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
rx.exceptions.OnErrorNotImplementedException
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6119)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
at rx.Scheduler$Worker$1.call(Scheduler.java:134)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
at rx.Scheduler$Worker$1.call(Scheduler.java:134)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
查看MissingBackpressureException异常类的源代码如下描述:
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;
/**
* Represents an exception that indicates that a Subscriber or operator attempted to apply reactive pull
* backpressure to an Observable that does not implement it.
* <p>
* If an Observable has not been written to support reactive pull backpressure (such support is not a
* requirement for Observables), you can apply one of the following operators to it, each of which forces a
* simple form of backpressure behavior:
* <dl>
* <dt><code>onBackpressureBuffer</code></dt>
* <dd>maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers
* according to the requests they generate</dd>
* <dt><code>onBackpressureDrop</code></dt>
* <dd>drops emissions from the source Observable unless there is a pending request from a downstream
* Subscriber, in which case it will emit enough items to fulfill the request</dd>
* </dl>
* If you do not apply either of these operators to an Observable that does not support backpressure, and if
* either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull
* backpressure, you will encounter a {@code MissingBackpressureException} which you will be notified of via
* your {@code onError} callback.
* <p>
* There are, however, other options. You can throttle an over-producing Observable with operators like
* {@code sample}/{@code throttleLast}, {@code throttleFirst}, or {@code throttleWithTimeout}/{@code debounce}.
* You can also take the large number of items emitted by an over-producing Observable and package them into
* a smaller set of emissions by using operators like {@code buffer} and {@code window}.
* <p>
* For a more complete discussion of the options available to you for dealing with issues related to
* backpressure and flow control in RxJava, see
* <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>.
*/
public class MissingBackpressureException extends Exception {
private static final long serialVersionUID = 7250870679677032194L;
/**
* Constructs the exception without any custom message.
*/
public MissingBackpressureException() {
}
/**
* Constructs the exception with the given customized message.
* @param message the customized message
*/
public MissingBackpressureException(String message) {
super(message);
}
}
如上面文字描述,抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。
onBackpressurebuffer:
把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。onBackpressureDrop:
将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。
下面参考了几篇博客之后,理解了Backpressure的概念,并解决了rx.exceptions.MissingBackpressureException异常,读者也可以参考以下几篇博客之后,自己根据自己实际情况来选择不同的解决方式来解决该异常。
参考链接如下:
关于RxJava最友好的文章——背压(Backpressure)
https://zhuanlan.zhihu.com/p/24473022?refer=dreawerRxJava 并发之数据流发射太快如何办(背压(Backpressure))
http://blog.csdn.net/jdsjlzx/article/details/51868640- RxJava 2.0中backpressure(背压)概念的理解
http://blog.csdn.net/jdsjlzx/article/details/52717636- RxJava 驯服数据流之 hot & cold Observable
http://blog.csdn.net/jdsjlzx/article/details/51839090- RxJava 教程第四部分:并发 之数据流发射太快如何办
http://blog.chengyunfeng.com/?p=981&utm_source=tuicool&utm_medium=referral- http://reactivex.io/RxJava/javadoc/rx/exceptions/MissingBackpressureException.html
Backpressure
https://github.com/ReactiveX/RxJava/wiki/BackpressureJava Code Examples for rx.exceptions.MissingBackpressureException
http://www.programcreek.com/java-api-examples/index.php?api=rx.exceptions.MissingBackpressureException
作者:欧阳鹏 欢迎转载,与人分享是进步的源泉!
转载请保留原文地址: http://blog.csdn.net/ouyang_peng/article/details/66978253