javascript-如何按顺序使用RxJS Observables?

达成协议:我有一个HTTP get请求,该请求返回对象的JSON列表.我使用RxJS订阅以接收该列表的数据.现在,对于该列表中的每个对象,我想执行另一个HTTP请求,然后将该请求的结果放入数组中.

到目前为止,我已经能够做到这一点,但似乎无法弄清楚如何维护数据初始列表的顺序.这可能与整个Observable机制是异步的这一事实有关.

这是我的代码:

    ngOnInit(): void {
    this.shiftInformationService.getShifts("2016-11-03T06:00:00Z", "2016-11-06T06:00:00Z")
        .subscribe(shifts => {
            shifts.forEach(shift => {
                this.addDataToAreaChart(shift.startDateTime, shift.endDateTime, shift.description);
            });
        });

}

addDataToAreaChart(startDate: string, endDate: string, description: string) {
    this.machineStatisticsService
        .getCumulativeMachineStateDurations(startDate, endDate)
        .subscribe(s => {
            this.areaChartData = [];
            this.areaChartData.push(new AreaChartData(startDate, endDate, description, s));
        });
}

我想要的是在推入areaChartData数组中的数据时,保持shifts.forEach循环的调用顺序.

有任何想法吗?帮助将不胜感激!

更新:已解决!

最终代码:

ngOnInit(): void {
    var startDate = new Date();
    startDate.setDate(startDate.getDate() - 3);

    this.shiftInformationService.getShifts(DateUtil.formatDate(startDate), DateUtil.formatDate(new Date()))
        .subscribe(shifts => {
            Observable.from(shifts)
                .concatMap((shift) => {
                    return this.machineStatisticsService
                        .getCumulativeMachineStateDurations(shift.startDateTime, shift.endDateTime)
                        .map((result) => {
                            return {
                                "addDataToAreaChartValue": result,
                                "shift": shift
                            }
                        });
                })
                .subscribe(s => {
                    this.areaChartData = [];
                    this.areaChartData.push(
                        new AreaChartData(
                            s.shift.startDateTime,
                            s.shift.endDateTime,
                            s.shift.description + ' ' + s.shift.startDateTime.slice(5, 10),
                            s.addDataToAreaChartValue
                        )
                    );
                });
        });
}

感谢迈克尔!

解决方法:

使用concatMap进行顺序处理.

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

使用map以可观察的方式附加/转换值.

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

所以,你需要这样做

ngOnInit(): void {
    this.shiftInformationService.getShifts("2016-11-03T06:00:00Z", "2016-11-06T06:00:00Z")
        .subscribe(shifts => {
            Rx.Observable.from(shifts) // create observable of each value in array
                .concatMap((shift) => { // process in sequence
                    return this.addDataToAreaChart(
                        shift.startDateTime, 
                        shift.endDateTime, 
                        shift.description
                    ).map((result) => {
                        return {
                           "addDataToAreaChartValue" : result, // addDataToAreaChart result
                           "shift": shift // append shift object here, so we can access it on subscribe
                        }
                    });
                })
                .subscribe(s => {
                    //this.areaChartData = []; // why??
                    this.areaChartData.push(
                        new AreaChartData(
                            s.shift.startDate, 
                            s.shift.endDate, 
                            s.shift.description, 
                            s.addDataToAreaChartValue
                        )
                    );
                });
        });
}

addDataToAreaChart(startDate: string, endDate: string, description: string) {
    return this.machineStatisticsService
        getCumulativeMachineStateDurations(startDate, endDate);
}
上一篇:rxjs 学习实践笔记


下一篇:javascript-错误后继续订阅