RxJava Observable在异步调用中创建的替代方法

我听了这个演讲
https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274

并且我应该避免使用create方法创建一个Observable,因为它不能自动处理unsubscription和backpressure,但我找不到在下面的代码中使用的替代方法.

compositeSubscription.add(
    Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() {
        @Override
        public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) {

            modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() {
                @Override
                public void onResult(DTOCompaniesCallback dtoCompaniesCallback) {
                    try {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(dtoCompaniesCallback);
                            subscriber.onCompleted();
                        }
                    } catch (Exception e) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(e);
                        }
                    }
                }
            });

        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<DTOCompaniesCallback>() {
        @Override
        public void call(DTOCompaniesCallback dtoCompaniesCallback) {
            Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size());
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            throw new one rrorNotImplementedException("Source!", throwable);
        }
    })
);

我打电话给OnDestroy方法清除CompositeSubscription

@Override
public void onDestroy() {
    if (compositeSubscription != null) {
        compositeSubscription.clear();
    }
}

你看到我可以在这里使用的create方法的替代方法吗?
您是否看到任何潜在的危险或这种方法是否安全?
谢谢

解决方法:

您可以使用延迟AsyncSubject:

Observable.defer(() -> {
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
    modelTrainStrike.getCompaniesFromServer(v -> {
        async.onNext(v);
        async.onComplete();
    });
    return async;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
...

如果getCompaniesFromServer支持取消,您可以:

Observable.defer(() -> {
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
    Closeable c = modelTrainStrike.getCompaniesFromServer(v -> {
        async.onNext(v);
        async.onComplete();
    });
    return async.doOnUnsubscribe(() -> {
        try { c.close(); } catch (IOException ex) { }
    });
})
上一篇:2022年全球市场机械泵密封总体规模、主要生产商、主要地区、产品和应用细分研究报告


下一篇:javascript – Angular 2如何在登录浏览器控制台之前处理404?