我必须调用返回项列表的API.对于此列表中的每个项目,我必须调用另一个API(如果列表返回8个项目,我将不得不进行8个并行调用).
我最终必须返回一个列表,我将使用这8个并行调用的结果创建.
我怎么能用RxJava做到这一点?我认为我必须使用flatMap将第一次调用的结果转换为Observables列表,然后我必须使用zip运算符进行并行调用,但我不确定.
请注意,我使用的是RxJava2,没有lambdas表达式.
谢谢 !
解决方法:
你可以这样做,例如,
defer()允许您仅在订阅时获取数据,然后创建在项列表中发出所有项目(逐个)的Observable.
然后flatMap()将创建Observable来获取每个项目的数据,现在你将拥有发出Data对象的Observable.
为了收集它,你可以使用toList()来发出单个对象(一个List),它将包含每个Observable提取的所有数据.
注意,为了并行执行,重要的是fetchDataFromItem()将在Schedulers.io()上订阅,即使它在io上订阅了所有流.
Observable.defer(new Callable<ObservableSource<Item>>() {
@Override
public ObservableSource<Item> call() throws Exception {
List<Item> items = getItems();
return Observable.fromIterable(items);
}
})
.flatMap(new Function<Item, ObservableSource<Data>>() {
@Override
public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
return fetchDataFromItem(item);
}
})
.toList()
.subscribe(new Consumer<List<Data>>() {
@Override
public void accept(@NonNull List<Data> objects) throws Exception {
//do something with the list of all fetched data
}
});
更新:
如果提取的项目已经是Observable,则defer()可以替换为flatMapIterable(),它接受单个项目列表并将其转换为多个项目的Observable:
getItemsObservable()
.flatMapIterable(new Function<List<Item>, Iterable<Item>>() {
@Override
public Iterable<Item> apply(@NonNull List<Item> items) throws Exception {
return items;
}
})
.flatMap(new Function<Item, ObservableSource<Data>>() {
@Override
public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
return fetchDataFromItem(item);
}
})
.toList()
.subscribe(new Consumer<List<Data>>() {
@Override
public void accept(@NonNull List<Data> objects) throws Exception {
//do something with the list of all fetched data
}
});