我有一个rxjs观察者(真的是一个主题)永远地尾随文件,就像tail -f一样.例如,监控日志文件非常棒.
这种“永远”的行为对我的应用来说非常棒,但对于测试来说却很糟糕.目前我的应用程序工作,但我的测试永远挂起
我想强制观察者更改尽早完成,因为我的测试代码知道文件中应该有多少行.我该怎么做呢?
我尝试在我返回的Subject句柄上调用onCompleted但是在那时它基本上被强制转换为观察者并且你不能强制它关闭,错误是:
Object # has no method ‘onCompleted’
这是源代码:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
这里的测试代码无法弄清楚如何强制永远结束(磁带样式测试).注意“ILLEGAL”行:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
解决方法:
这听起来像你解决了你的问题,但对你原来的问题
I’d like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?
一般来说,当你有更好的选择时,不鼓励使用Subjects,因为它们往往成为人们使用他们熟悉的编程风格的拐杖.我没有尝试使用主题,而是建议您考虑每个事件在Observable生命周期中的意义.
包裹事件发射器
已经存在Observable.fromEvent形式的EventEmitter #on / off模式的包装器.它只在有侦听器时处理清理并保持订阅活动.因此ObserveTail可以重构成
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var close = Rx.Observable.fromEvent(tail, "close");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line.takeUntil(close).merge(error).subscribe(observer);
});
}
与使用Subjects的vanilla相比,它有几个好处,一个,你现在实际上会看到下游的错误,两个,当你完成它们时,它将处理你的事件的清理.
避免*同步方法
然后,这可以滚动到您的文件存在检查,而无需使用readSync
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
.filter(function(exists) { return exists; })
.flatMap(ObserveTail(filename));
接下来,您可以使用flatMap简化过滤器/贴图/贴图序列.
var result = source.flatMap(function(x) {
try {
return Rx.Observable.just(JSON.parse(x));
} catch (e) {
return Rx.Observable.empty();
}
},
//This allows you to map the result of the parsed value
function(x, json) {
return json.name;
})
.timeout(10000, "observer timed out");
不要发出信号,取消订阅
当流只向一个方向行进时,如何停止“发出信号”停止.我们实际上很少想让Observer直接与Observable通信,所以更好的模式是不实际“发出信号”停止,而是简单地取消订阅Observable并将其留给Observable的行为以确定它应该从那里做什么.
基本上你的观察者真的不应该关心你的Observable而不是说“我在这里完成了”.
要做到这一点,您需要在停止时声明要达到的条件.
在这种情况下,由于您只是在测试用例中的一组数字后停止,您可以使用take来取消订阅.因此,最终的订阅块看起来像:
result
//After lines is reached this will complete.
.take(lines)
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
编辑1
正如评论中所指出的那样,在这个特定的api的情况下,没有真正的“关闭”事件,因为Tail本质上是一个无限的操作.从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件.所以你的块可能最终看起来像:
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line
.finally(function() { tail.unwatch(); })
.merge(error).subscribe(observer);
}).share();
}
最终和共享操作符的添加创建了一个对象,该对象将在新订户到达时附加到尾部,并且只要至少有一个订户仍在收听,它将保持连接.一旦所有订户完成,我们就可以安全地看不到尾巴了.