场景:
我正在通过一个简单的ajax调用加载一个初始数据数组并将这些数据放入和Observable,我将其称为历史数据.同时,我连接到websocket并定期接收数据,我们将其称为更新,并且我想将此数据附加到历史数据.
具体来说,假设ajax调用发送回数组[0,1,2]并且套接字发出(随着时间的推移)3,4,5然后我想积累这些值,如下所示:
[0,1,2] // historical
[0,1,2,3] // historical + updates1
[0,1,2,3,4] // historical + updates1 + updates2
[0,1,2,3,4,5] // etc
(请注意,这里有一个并发边缘情况需要处理:可能发生历史收益[0,1,2,3]和前两个更新是3和4,在这种情况下我想要结束仍然是[0,1,2,3,4] – 不是[0,1,2,3,3,4].)
最终目标是最终得到一个Observable流,它是所描述的Observables历史和更新的组合.
到目前为止我尝试过的:
只是累积websocket数据很容易.我创建更新,这是websocket发出的Observable序列.每次观察到一个值,我都可以使用scan()将它累积到数组中:
updates.scan((acc, update) => acc.concat([update]), [])
这会产生类似的东西
[3]
[3,4]
[3,4,5]
我的下一个问题是如何将其与历史相结合.由于历史数据可能在已经观察到一个或多个更新后到达,因此在我们等待历史记录时需要累积这些更新.我设法使用withLatestFrom()实现了这个目的:
const stream = historical
.withLatestFrom(
updates.scan((acc, update) => acc.concat([update]), []),
(history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
)
观察流产生单个值,[0,1,2,3,4,5],它是历史之前到达的历史和任何更新的组合.正是我想要的.
但是,我无法弄清楚从哪里去.如何继续将更新添加到流中,以便随着时间的推移,流产生类似于:
[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]
我没有看到使用扫描的方法,就像我为更新做的那样,因为在这种情况下我需要扫描的初始(种子)值为Observable,而不是Array.
有没有办法做到这一点 – 通过添加到目前为止的东西或更好的替代方式来做整个事情?
解决方法:
如果我理解正确的话,我会使用skipUntil()
运算符继续收集更新,而不会进一步发布.然后对于withLatestFrom()运算符,我会选择更新Observable作为其来源.这等待感谢skipUntil(),直到历史数据可用,然后在每次更新发出时发出.
let updates = Observable
.timer(0, 1000)
.scan((acc, update) => {
acc.push(update);
return acc;
}, []);
let historical = Observable.defer(() => {
console.log('Sending AJAX request ...');
return Observable.of(['h1', 'h2', 'h3']);
})
.delay(3000)
.share();
const stream = updates.skipUntil(historical)
.withLatestFrom(historical, (buffer, history) => {
return history.concat(buffer);
})
.map(val => val) // remove duplicates;
stream.subscribe(val => console.log(val));
控制台中的输出如下:
Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]
观看现场演示:https://jsbin.com/kumolez/11/edit?js,console
我不知道你的用途是什么,但我会尽量避免使用concat(),因为缓冲区增长时可能会变慢.
此外,如果您在更新时发出更新(而不是累积它们),您可以使用distinct()运算符来过滤掉重复项.
顺便说一下,我假设你正在使用RxJS 5.