好吧,所以我是Rx的初学者,不幸的是js中的js和stream也是新手.我使用这个https://github.com/trygve-lie/twitter-stream-api连接到twitters流api并通过推文接收json对象.到目前为止,我有这个代码
var Rx = require('rxjs/Rx');
var TwitterStream = require('twitter-stream-api'),
fs = require('fs');
var filter = 'tweet';
var keys = {
consumer_key : "key",
consumer_secret : "secret",
token : "token",
token_secret : "tokensecret"
};
var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
track: filter
});
Twitter.on('connection success', function (uri) {
console.log('connection success', uri);
});
Twitter.on('data', function (obj) {
console.log(obj.text);
});
我正在成功地将推文写入控制台,但我真正想要学习的是使用流,特别是RxJS.我已经尝试了所有可以想到的方法来创建一个可观察的. Rx.Observable.创建/来自等…
我也尝试了Twitter.resume(),因为它显然暂停,恢复流并观察它.我只得到错误,如不能.subscribe不是一个功能.根据我的上述内容,如何使用Rx.Observable开始过滤和播放数据?
谢谢!
解决方法:
RxJS 5没有任何方法可以将流转换为/到Observable,因此您需要自己完成此操作.理想情况下使用Observable.create.
const Rx = require('rxjs');
const Observable = Rx.Observable;
var TwitterStream = require('twitter-stream-api'),
...
var source$= Observable.create(observer => {
var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
track: filter
});
Twitter.on('data', function (obj) {
observer.next(obj);
});
return () => {
Twitter.close();
};
});
这使得一个冷的Observable只有在你订阅它时才会连接到Twitter. Observable.create静态方法让你将值推送给观察者,最后返回一个拆卸函数,然后关闭连接.取消订阅或Observable完成时会调用此函数.
然后你可以用你想要的任何东西链接这个Observable:
source$.filter(...).map(...)
注意,还有方法Observable.bindCallback()和Observable.bindNodeCallback()但这些对你的情况没有多大帮助.
阅读更多:
> http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
> https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339#.scxgvqp49
> https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87#.2099xwi13