javascript – 强制完成rxjs观察者

我有一个rxjs观察者(真的是一个主题)永远地尾随文件,就像tail -f一样.例如,监控日志文件非常棒.

这种“永远”的行为对我的应用来说非常棒,但对于测试来说却很糟糕.目前我的应用程序工作,但我的测试永远挂起

我想强制观察者更改尽早完成,因为我的测试代码知道文件中应该有多少行.我该怎么做呢?

我尝试在我返回的Subject句柄上调用onCompleted但是在那时它基本上被强制转换为观察者并且你不能强制它关闭,错误是:

Object # has no method ‘onCompleted’

这是源代码:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

这里的测试代码无法弄清楚如何强制永远结束(磁带样式测试).注意“ILLEGAL”行:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})

解决方法:

这听起来像你解决了你的问题,但对你原来的问题

I’d like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?

一般来说,当你有更好的选择时,不鼓励使用Subjects,因为它们往往成为人们使用他们熟悉的编程风格的拐杖.我没有尝试使用主题,而是建议您考虑每个事件在Observable生命周期中的意义.

包裹事件发射器

已经存在Observable.fromEvent形式的EventEmitter #on / off模式的包装器.它只在有侦听器时处理清理并保持订阅活动.因此ObserveTail可以重构成

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

与使用Subjects的vanilla相比,它有几个好处,一个,你现在实际上会看到下游的错误,两个,当你完成它们时,它将处理你的事件的清理.

避免*同步方法

然后,这可以滚动到您的文件存在检查,而无需使用readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

接下来,您可以使用flatMap简化过滤器/贴图/贴图序列.

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

不要发出信号,取消订阅

当流只向一个方向行进时,如何停止“发出信号”停止.我们实际上很少想让Observer直接与Observable通信,所以更好的模式是不实际“发出信号”停止,而是简单地取消订阅Observable并将其留给Observable的行为以确定它应该从那里做什么.

基本上你的观察者真的不应该关心你的Observable而不是说“我在这里完成了”.

要做到这一点,您需要在停止时声明要达到的条件.

在这种情况下,由于您只是在测试用例中的一组数字后停止,您可以使用take来取消订阅.因此,最终的订阅块看起来像:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

编辑1

正如评论中所指出的那样,在这个特定的api的情况下,没有真正的“关闭”事件,因为Tail本质上是一个无限的操作.从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件.所以你的块可能最终看起来像:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

最终和共享操作符的添加创建了一个对象,该对象将在新订户到达时附加到尾部,并且只要至少有一个订户仍在收听,它将保持连接.一旦所有订户完成,我们就可以安全地看不到尾巴了.

上一篇:vuetify的安装使用步骤


下一篇:RxJS简介