node 中的 stream

什么是 stream

Stream 借鉴自 Unix 编程哲学中的 pipe。

Unix shell 命令中觉的管道流式操作 | 将上一个命令的输出作为下一个命令的输入。node stream 中则是通过 .pip() 方法来进行的。

一个 stream 的运用场景。从服务器读取文件并返回给页面。

  • 朴素的实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);
  • stream 实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

好处:

  • 代码更加简洁
  • 可*组合各种模块处理数据

stream 的种类

分五种:

  • readable
  • writable
  • duplex
  • transform
  • classic

readable

readable 类型的流产生数据,可通过 .pip() 输送到能够消费流数据的地方,比如 writable,transform,duplex

一个 readable stream 示例:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

运行结果:

$ node read0.js
beep boop

_read 方法与按需输出

上面 rs.push(null) 表示没有更多数据了。

上面从代码直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费。因为消费者有可能并不能立即消费这些内容,直接 push 数据后消耗不必要的资源。

更好的做法是,让 readable 流只在消费者需要数据的时候再 push。这是通过定义能 raedable 对象定义 ._read 方法来完成的。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

运行结果:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

这种方式下,定义了 readable 流产生数据的方法 ._read,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。

_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。

下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);
    
    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

输出任意数据

上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })

消费 readable 流产生的数据

这一段没看太懂

writable 流

writable 流可作为 .pip() 的对象。

src.pipe(writableStream)

创建 writable 流

需要实现 ._write(chunk, enc, next) 方法,其中:

  • chunk 为接收到的数据
  • encopts.decodeStringfalse 且收到的数据这字符串时,它表示字符串的编码
  • next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败

默认情况下,获取到的字符串数据会转为 Buffer,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。

一个 writable 示例:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

向 writable 流写入数据

通过调用 writable 流的 write 方法来写入。

process.stdout.write('beep boop\n');

通过调用 end() 来结束数据的写入。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

duplex

双工类型的流,同时具有 writable 和 readable 流的功能。node 内建的 zlib,TCP sockets 以及 crypto 都是双工类型的。

所以可对双工类型的流进行如下操作:

a.pip(b).pip(a)

transform

一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam

classic stream

这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。

classic readable stream

当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。

.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。

classic readable 流的创建

一个 classic readable 流的创建示例:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

从 classic readable 流读取数据

数据读取是通过监听流上的 dataend 事件。

一个从 classic readable 流读取数据的示例:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});

一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 throughconcat-stream 来进行。

classic writable stream

只需要实现 .write(buf).end(buf) 及 .destroy() 方法即可,比较简单。

内建的流对象

node 中的 stream

总结

本质上,所有流都是 EventEmitter,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。

参考

上一篇:Node中的流


下一篇:Document base xxx does not exist or is not a readable direct