我有一个本地存储库和一个远程存储库,我从中获取数据.我第一次想从远程存储库中获取数据,然后缓存它并从本地数据库中获取数据.
我正在使用2个Observables和concat方法来执行此操作:
Observable.concat(localWeatherForecast, remoteWeatherForecast)
.filter(weatherForecasts -> !weatherForecasts.isEmpty())
.first();
对于本地可观察我使用此:
private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
return mLocalDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
return Observable.from(weatherEntries)
.doOnNext(entry -> mCachedWeather.add(entry))
.toList();
}
});
}
和远程:
return mRemoteDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
return Observable.from(weather).doOnNext(entry -> {
mLocalDataSource.saveWeatherEntry(entry);
mCachedWeather.add(entry);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false);
这是订阅
Subscription subscription = mRepository
.getWeather()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.subscribe(
// onNext
this::processWeather,
// one rror
throwable -> mWeatherView.showLoadingTasksError(),
// onCompleted
() -> mWeatherView.setLoadingIndicator(false));
即使本地存储库为空,似乎concat也不会移动到第二个observable(远程的).如果我颠倒顺序然后它工作(远程首先在concat).
我试图删除filter方法/ first(),但即使这样远程observable也不会被处理.
有什么想法吗?谢谢!
解决方法:
你可以为第一个observable提供超时.如果第一个observable永远不会完成,它将不会切换到第二个observable.请看一下我提供的测试方法:
测试方法’never_false’不会产生任何值并在1000ms后超时,因为没有推送任何值.
对于方法’never_true’,concat中的第一个observable将在一段时间后超时,并将切换到onComplete observable.因此,concat将切换到第二个observable并从该流中获取第一个元素.
@Test
public void never_false() throws Exception {
Observable<List<Integer>> never = Observable.never();
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
Observable<List<Integer>> concat = Observable.concat(never, just);
boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);
assertThat(await).isTrue();
}
@Test
public void never_true() throws Exception {
Observable<List<Integer>> never = Observable.<List<Integer>>never()
.timeout(50, TimeUnit.MILLISECONDS)
.onErrorResumeNext(Observable.empty());
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
TestObserver<List<Integer>> test = Observable.concat(never, just)
.test()
.await()
.assertComplete()
.assertNoErrors();
List<Integer> collect = test.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(collect).contains(1, 2, 3);
}