android – 当第一个Observable为空时RxJava concat

我有一个本地存储库和一个远程存储库,我从中获取数据.我第一次想从远程存储库中获取数据,然后缓存它并从本地数据库中获取数据.
我正在使用2个Observables和concat方法来执行此操作:

Observable.concat(localWeatherForecast, remoteWeatherForecast)
                .filter(weatherForecasts -> !weatherForecasts.isEmpty())
                .first();

对于本地可观察我使用此:

private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
    return mLocalDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
                    return Observable.from(weatherEntries)
                            .doOnNext(entry -> mCachedWeather.add(entry))
                            .toList();
                }
            });
}

和远程:

return mRemoteDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
                    return Observable.from(weather).doOnNext(entry -> {
                        mLocalDataSource.saveWeatherEntry(entry);
                        mCachedWeather.add(entry);
                    }).toList();
                }
            })
            .doOnCompleted(() -> mCacheIsDirty = false);

这是订阅

Subscription subscription = mRepository
            .getWeather()
            .subscribeOn(mSchedulerProvider.computation())
            .observeOn(mSchedulerProvider.ui())
            .subscribe(
                    // onNext
                    this::processWeather,
                    // one rror
                    throwable -> mWeatherView.showLoadingTasksError(),
                    // onCompleted
                    () -> mWeatherView.setLoadingIndicator(false));

即使本地存储库为空,似乎concat也不会移动到第二个observable(远程的).如果我颠倒顺序然后它工作(远程首先在concat).
我试图删除filter方法/ first(),但即使这样远程observable也不会被处理.

有什么想法吗?谢谢!

解决方法:

你可以为第一个observable提供超时.如果第一个observable永远不会完成,它将不会切换到第二个observable.请看一下我提供的测试方法:

测试方法’never_false’不会产生任何值并在1000ms后超时,因为没有推送任何值.

对于方法’never_true’,concat中的第一个observable将在一段时间后超时,并将切换到onComplete observable.因此,concat将切换到第二个observable并从该流中获取第一个元素.

@Test
public void never_false() throws Exception {
    Observable<List<Integer>> never = Observable.never();

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
    Observable<List<Integer>> concat = Observable.concat(never, just);

    boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);

    assertThat(await).isTrue();
}

@Test
public void never_true() throws Exception {
    Observable<List<Integer>> never = Observable.<List<Integer>>never()
            .timeout(50, TimeUnit.MILLISECONDS)
            .onErrorResumeNext(Observable.empty());

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));

    TestObserver<List<Integer>> test = Observable.concat(never, just)
            .test()
            .await()
            .assertComplete()
            .assertNoErrors();

    List<Integer> collect = test.values().stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());

    assertThat(collect).contains(1, 2, 3);
}
上一篇:Angular之Rxjs基础操作


下一篇:Mobx 中将监听对象的observable 对象 改为正常的数据格式 用toJS 保留查询的搜索条件