javascript – 使用RxJS和twitter-stream-api模块订阅流

好吧,所以我是Rx的初学者,不幸的是js中的js和stream也是新手.我使用这个https://github.com/trygve-lie/twitter-stream-api连接到twitters流api并通过推文接收json对象.到目前为止,我有这个代码

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

我正在成功地将推文写入控制台,但我真正想要学习的是使用流,特别是RxJS.我已经尝试了所有可以想到的方法来创建一个可观察的. Rx.Observable.创建/来自等…

我也尝试了Twitter.resume(),因为它显然暂停,恢复流并观察它.我只得到错误,如不能.subscribe不是一个功能.根据我的上述内容,如何使用Rx.Observable开始过滤和播放数据?

谢谢!

解决方法:

RxJS 5没有任何方法可以将流转换为/到Observable,因此您需要自己完成此操作.理想情况下使用Observable.create.

const Rx = require('rxjs');
const Observable = Rx.Observable;

var TwitterStream = require('twitter-stream-api'),

...

var source$= Observable.create(observer => {
  var Twitter = new TwitterStream(keys);
  Twitter.stream('statuses/filter', {
    track: filter
  });

  Twitter.on('data', function (obj) {
    observer.next(obj);
  });

  return () => {
    Twitter.close();
  };
});

这使得一个冷的Observable只有在你订阅它时才会连接到Twitter. Observable.create静态方法让你将值推送给观察者,最后返回一个拆卸函数,然后关闭连接.取消订阅或Observable完成时会调用此函数.

然后你可以用你想要的任何东西链接这个Observable:

source$.filter(...).map(...)

注意,还有方法Observable.bindCallback()和Observable.bindNodeCallback()但这些对你的情况没有多大帮助.

阅读更多:

> http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
> https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339#.scxgvqp49
> https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87#.2099xwi13

上一篇:Javascript-http / rxjs中的Angular 2重定向捕获回调导致TypeError:无法读取未定义的属性’subscribe’


下一篇:javascript-根据事件从Observable中获取N个值,直到其完成为止.延迟加载多选列表