nodejs stream基础知识

分类

nodejs 的 stream 有四种:

  • Readable:可读流
  • Writable: 可写流
  • Duplex:双工流
  • Transform:转换流

Readable

// _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
// 在_read方法中,通过调用push(data)将数据放入可读流*下游消耗。
// 在_read方法中,可以同步调用push(data),也可以异步调用。
// 当全部数据都生产出来后,必须调用push(null)来结束可读流。
// 流一旦结束,便不能再调用push(data)添加数据。
// 创建一个可以读的流,通过这个流对象监听数据
const stream = require('stream');
const Readable = stream.Readable; class MyFileReader extends Readable {
constructor(options) {
super(options);
} attachDataSource(dataSource) {
this.dataSource = dataSource;
} // 实现 _read 方法
_read() {
// 通过 DataSource 传入数据
// 当 DataSource get 方法返回 null 会触发 Readable 的 end 事件,表示会结束
this.push(this.dataSource.get());
}
} class DataSource {
constructor(size = 10) {
this.size = size;
this.data = []; this.fill();
} // 生产数据
fill() {
for (let index = 0; index < this.size; index++) {
this.data.push(index.toString());
}
} // 获取数据
get() {
if (this.data.length === 0) {
return null;
} return this.data.pop();
}
} const dataSource = new DataSource();
const myFileReader = new MyFileReader(); myFileReader.attachDataSource(dataSource);
myFileReader.on('data', (data) => console.log('data: ' + data));
myFileReader.on('end', () => console.log('end'));

在 _read 方法里,可以调用 push 方法往缓冲池里放入数据,这里写的this.push(this.dataSource.get())方法是一个同步方法,获取完数据后立马再次执行 _read 方法获取下一条数据,如果你在_read 方法里写一个异步push 的数据

 setTimeout(() => {
this.push(this.dataSource.get());
});

那么本次 _read 方法执行后,没有数据则不会执行下一次 _read 方法,直到 state 发生了变化,具体的细节可以参看:https://blog.csdn.net/shasharoman/article/details/80251512 这篇文章

Writable

// 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()将data写入底层。
// 在_write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
// next的调用既可以是同步的,也可以是异步的。
// 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
// 在end方法调用后,当所有底层的写操作均完成时,会触发finish事件。 const Writable = require('stream').Writable class ToWritable extends Writable{
constructor(){
super();
} _write(data, enc, next) {
// 将流中的数据写入底层
process.stdout.write(data.toString().toUpperCase());
// 写入完成时,调用`next()`方法通知流传入下一个数据
process.nextTick(next);
}
} const writable = new ToWritable(); writable.on('finish', () => process.stdout.write('DONE')); // 将一个数据写入流中
writable.write(`a\n`);
writable.write(`b\n`);
writable.write(`c\n`); // 再无数据写入流时,需要调用`end`方法
writable.end();

Duplex

// 代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
// 同时,又实现了_write方法,可作为下游去消耗数据。 const Duplex = require('stream').Duplex class ToDuplexable extends Duplex{
constructor(){
super();
this.limit = 0;
} _write(buf, enc, next) {
this.limit ++ ;
next();
} _read() {
if(this.limit--){
this.push('data');
}
// this.push(this.index++)
}
} const toDuplexable = new ToDuplexable(); toDuplexable.on('data', data => process.stdout.write(data));
toDuplexable.on('end', () => process.stdout.write('DONE')); toDuplexable.write(''); setTimeout(() => {
toDuplexable.write('');
toDuplexable.end();
}, 1000);

Transform

// Duplex 可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端
//Tranform继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法。 const Transform = require('stream').Transform class MyTransform extends Transform{
constructor() {
super()
} _transform(buf, enc, next){
let res = buf.toString('utf8'); // 调用push方法将变换后的数据添加到可读端
this.push(`${res}_transform\n`);
// 调用next方法准备处理下一个
next();
}
} var myTransform = new MyTransform(); myTransform.on('data', data => process.stdout.write(data)); myTransform.write('abc');
myTransform.write('def');
myTransform.end();

参考:

上一篇:nodeJs文件系统(fs)与流(stream)


下一篇:ASP.NET Web API 简介