RxJava2 observable抛出UndeliverableException

据我所知,RxJava2 values.take(1)创建另一个Observable,它只包含原始Observable中的一个元素.哪个不能抛出异常,因为它被take(1)的效果过滤掉,因为它发生在第二个.

如下面的代码片段所示

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

产量

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

我的问题:

>我理解它是否正确?
>导致异常的真正原因是什么.
>如何从消费者那里解决这个问题?

解决方法:

>是的,但是因为可观察的“结束”并不意味着在create(…)中运行的代码被停止了.要在这种情况下完全安全,您需要使用o.isDisposed()来查看observable是否已经在下游结束.
>例外是因为RxJava 2的策略是永远不允许onError调用丢失.如果observable已经终止,它将作为全局UndeliverableException传递到下游或抛出.由Observable的创建者“正确”处理observable已结束且发生异常的情况.
>问题是生产者(Observable)和消费者(订阅者)在流结束时不一致.由于生产者在这种情况下比消费者寿命长,因此问题只能在生产者中解决.

上一篇:POJ 2478 Farey Sequence


下一篇:[转载] Lucene 工作原理