花了两天时间尝试按照自己的话翻译了一下stream模块,以下内容皆翻译于:https://nodejs.org/api/stream.html.
目录
1.3.1.1 Class: stream.Writable
1.3.1.1.1........................................................................ Event: 'close'
1.3.1.1.2........................................................................ Event: 'drain'
1.3.1.1.3........................................................................ Event: 'error'
1.3.1.1.4....................................................................... Event: 'finish'
1.3.1.1.5.......................................................................... Event: 'pipe'
1.3.1.1.6..................................................................... Event: 'unpipe'
1.3.1.1.7..................................................................... writable.cork()
1.3.1.1.8............ writable.end([chunk][, encoding][, callback])
1.3.1.1.9...................... writable.setDefaultEncoding(encoding)
1.3.1.1.10.............................................................. writable.uncork()
1.3.1.1.11................................ writable.writableHighWaterMark
1.3.1.1.12................................................ writable.writableLength
1.3.1.1.13.......... writable.write(chunk[, encoding][, callback])
1.3.1.1.14................................................ writable.destroy([error])
1.3.2.4 Class: stream.Readable
1.3.2.4.1........................................................................ Event: 'close'
1.3.2.4.2.......................................................................... Event: 'data'
1.3.2.4.3........................................................................... Event: 'end‘
1.3.2.4.4........................................................................ Event: 'error'
1.3.2.4.5................................................................. Event: 'readable'
1.3.2.4.6........................................................... readable.isPaused()
1.3.2.4.7................................................................. readable.pause()
1.3.2.4.8.......................... readable.pipe(destination[, options])
1.3.2.4.9................................ readable.readableHighWaterMark
1.3.2.4.10....................................................... readable.read([size])
1.3.2.4.11.............................................. readable.readableLength
1.3.2.4.12........................................................... readable.resume()
1.3.2.4.13................................ readable.setEncoding(encoding)
1.3.2.4.14.................................... readable.unpipe([destination])
1.3.2.4.15................................................ readable.unshift(chunk)
1.3.2.4.16................................................... readable.wrap(stream)
1.3.2.4.17............................................... readable.destroy([error])
1.3.3 Duplex and Transform Streams
1.3.3.2 Class: stream.Transform
1.3.3.2.1............................................... transform.destroy([error])
1.4 API for Stream Implementers
1.4.2.1 构造器: new stream.Writable([options])
1.4.2.2 writable._write(chunk, encoding, callback)
1.4.2.3 writable._writev(chunks, callback)
1.4.2.4 writable._destroy(err, callback)
1.4.2.5 writable._final(callback)
1.4.2.8 在 Writable Stream解析buffer
1.4.3.1 new stream.Readable([options])
1.4.3.3 readable._destroy(err, callback)
1.4.3.4 readable.push(chunk[, encoding])
1.4.3.6 An Example Counting Stream
1.4.4.1 new stream.Duplex(options)
1.4.5.1 new stream.Transform([options])
1.4.5.2 事件: 'finish' and 'end'
1.4.5.3 transform._flush(callback)
1.4.5.4 transform._transform(chunk, encoding, callback)
1.4.5.5 Class: stream.PassThrough
1.5.4 highWaterMark与readable.setEncoding()
1 Stream(流)
stream是一个抽象接口,旨在处理数据流。stream模块提供了基本的API,方便继承stream接口,从而构造流式对象。
Nodejs里有很多流式对象,比如说http请求对象和process.stdout对象。
流能写,能写或兼二者,所以的流都是EventEmitter的一个实例,换句话说,Nodejs暴露出来的流都是继承自EventEmitter。
通过以下方式引入stream模块:
const stream = require('stream');
1.1 这篇文档的组织方式
文档分为两个主要部分,另外第三部分是额外注意的地方。第一部分阐释了使用流的基本要素。第二部分则阐释了如何自定义流。
1.2 stream的种类
有4种基本类型的流:
Readable – 可以从中读取数据的流(比如说 fs.createReadStream()
)
Writable – 可以向其写入数据的流(比如说 fs.createWriteStream()
)
Duplex – 能读能写的流(比如说 net.Socket)
Transform – 跟Duplex一样可读写,但可以修改流的数据(比如说 zlib.createDeflate()
)
1.2.1 对象模式
所有的流都专门用来处理strings和Buffer(或者Uint8Array)对象。然而流也可以用来处理其他JavaScript类型。这种流被认为是以“对象模式”去处理数据。
创建流对象时,可以使用objectMode选项转换成对象模式。但试图把一个已经存在的流转换成对象模式并不安全。
1.2.2 Buffering
Writable和Readable流在内部实现上都有一个buffer容器,用来存储数据,可以分别通过writable.writableBuffer和readable.readableBuffer访问这个buffer容器。
存储数据的容量决定于highWaterMark选项,通过创建流时传入。对象普通的流,highWaterMark值代表总字节数,而对于对象模式流,这个参数指定总的对象数。
当调用stream.push(chunk),chunk数据会存入Readable流,如果流的消费者没有调用stream.read(),数据会一直在内部队列,直到被消费。
一旦内部的数据量达到了由highWaterMark指定的临界值,流将会暂时停止从底层资源读取数据,直到当前buffered的数据被消费(也就是说,stream将停止调用内部函数readble._read()方法,此方法用来填充内部buffer)
当调用writable.write(chunk),数据被buffer在Writable stream中。当内部bbuffer小于highWaterMark值,writable.write()返回true,否则返回false。
stream API的一个关键目标,特别是stream.pipe()方法,就是以一个合理的方式协调两边的buffer数据,因为源数据和目的数据的读取速度不一致可能会导致内存耗尽。
Duplex和Transform都是可读可写,所以内部需要维持两个单独的buffer容器,用于读写,这就使得在维持合适和有效的数据流下,读和写可以单独进行。比如说net.Socket实例就是一个Duplex流,可读端可以消费数据,可写端可以写入数据。因为数据的写入可能比数据的读入更快或者更慢,所以单独操作显得是有必要的。
1.3 API for Stream Consumers
几乎所有的nodejs应用或多或少都会用到流。下面是一个使用流的例子,实现了http服务:
const http = require('http');
const server = http.createServer((req, res) => {
// req is an http.IncomingMessage, which is a Readable Stream
// res is an http.ServerResponse, which is a Writable Stream
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the end event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
}); server.listen(1337); // $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1
Writable流暴露了像write()和end()方法来写数据到流中。
Readable流使用了EventEmitter的API,当有数据可读时,通过事件通知应用。当然,可以通过多种方式获取可读的数据。
一般情况下,没有必要自己实现流的接口。
1.3.1 Writable Streams
可写流是对数据被写入的目的一层抽象.
Nodejs里可写流的例子有:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout, process.stderr
所有的可以流都实现了stream.Writable类里的接口。
尽管一些特定的可写流在一些方面不 一样,但所有的可写流都遵循下面的基本使用模式:
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
1.3.1.1 Class: stream.Writable
v0.9.4加入
1.3.1.1.1 Event: 'close'
当流和底层资源(文件描述符等)被关闭时触发。这个事件表明不再有其他事件触发,也不在计算数据。
不是所有的可写流都触发`cloase`事件。
1.3.1.1.2 Event: 'drain'
当调用stream.write(chunk)返回false时,`drain`事件触发。当合适地恢复向流中写数据
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
1.3.1.1.3 Event: 'error'
当在写入数据或者piping数据时发生错误,`error`事件触发,因调会传入error参数。
注意:当`error`事件发生,流不会被关闭。
1.3.1.1.4 Event: 'finish'
当调用stream.end()方法,而且所有的数据都flush进底层系统时,`finish`事件触发。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.error('All writes are now complete.');
});
1.3.1.1.5 Event: 'pipe'
- src <stream.Readable> source stream that is piping to this writable
当在一个可读流上调用stream.pipe()方法,把这个可写读作为可读流的一个目地时,`pipe`事触发。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.error('something is piping into the writer');
assert.equal(src, reader);
});
reader.pipe(writer);
1.3.1.1.6 Event: 'unpipe'
- src <stream.Readable> The source stream that unpiped this writable
当在一个可读流上调用unpipe()方法时触发,把这个可写流从它的目地流中移除。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.error('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
1.3.1.1.7 writable.cork()
v0.11.2加入
writable.cork()方法强制使得被写入的数据buffer进内存(不是一直都是在内存里么?),被buffer的数据将会flush,当调用stream.uncork()或者stream.end()方法时。
cork最主要的目的是避免这样一个场景:当向流中写入许多小块数据时,导致不在内部buffer里备份,这样会对性能有一不利的影响。如此,writable._writev()方法能以更优的方法处理buffer写入。
1.3.1.1.8 writable.end([chunk][, encoding][, callback])
- chunk <string> | <Buffer> | <Uint8Array> | <any> 写入stream的数据。对于普通的流,chunk必须是string、Buffer或者Unit8Array对象。对于对象模式流,chunk是JavaScript值,不能是null。
- encoding <string> 如果设置了编码,那chunk就是string。
- callback <function> 当流结束时的回调。
调用end()方法表明不再有数据写入到Writable流。如果传入了chunk参数,则是最后一次向流写入数据,之后会关闭流。如果设置了callback,则回调函数将作为`finish`事件的监听器。
在调用了stream.end()方法后再调用stream.write()方法,将会抛出异常。
1.3.1.1.9 writable.setDefaultEncoding(encoding)
- encoding <string> 新的默认编码方式
- Returns: this
设置默认的编码方式
1.3.1.1.10 writable.uncork()
uncork()方法flush所有buffer的数据,自cork()方法调用尹始。
当使用cork()和uncork()方法管理buffer数据时,推荐在process.nextTick()里调用uncork(),在下个事件循环中调用。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果cork()方法被调用多次,那么uncork()方法也必须调用相应多的次数,不然数据不会被flush。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
1.3.1.1.11 writable.writableHighWaterMark
v9.3.0加入
返回构造流时传入的highWaterMark值。
1.3.1.1.12 writable.writableLength
v9.4.0加入
队列里准备写的字节数(应该是这样理解的:数据先在writable流里列队里,再写入到底层资源)
1.3.1.1.13 writable.write(chunk[, encoding][, callback])
- chunk <string> | <Buffer> | <Uint8Array> | <any> 写入的数据,对于对象模式,则是JavaScript对象
- encoding <string> 编码方式
- callback <Function> 当这块数据被flush时,回调。
- Returns: <boolean> 如果流希望在继续写入额外数据时前,`drain`事件触发,则返回false,否则返回true
write()方法向流写入一些数据,并且一旦写入的数据被处理,回调会调用。如果有错误发生,callback可能,也可能不,会被调用,这进第一个参数是error。 为了更可靠的检测错误的发生,最好的办法是添加`error`事件。
如果内部buffer数据小于highWaterMark,write()方法返回true。否则返回false,这时不应该继续写入数据到流中,直到`drain`事件触发。这说明`drain`事件触发后,就是内部buffer被可用的时候,因为drain是排,排干的意思。
当一个流没有在draining(排),调用write()方法会buffer chunk,并且返回false。一旦所有的buffer chunk被drained(排), `drain`事件会触发。推荐一旦write()方法返回false,不再写入chunks,直到`drain`事件触发。当向一个不允许draining的流上调用write()方法时,Nodejs会buffer所有被写入的chunks,直到最大内存使异常出现,此时会无条件中止。基于在中止前,大量的内存使用会导致性能低下的垃圾回收,和高RSS(最大内存常驻区,没有释放给操作系统)。因为TCP sockets可能永远不会drain,因为远端不从流中read数据,不停的向socket写入可能导致远程可利用的漏洞。
对于Transform,写入一个不会draining数据的流特别是个问题,因为Transform默认会暂停,直到被导向piped或者`data`或`readable`事件处理器被添加。
如果被写入的数据能被生成或者需要按需获取,推荐封装逻辑进Readable,并且使用stream.pipe()方法。但如果更愿意使用write(),鉴于背压和避免内存问题,应该使用`draing`事件:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
} // Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('write completed, do more writes now');
});
处于对象模式的流会忽视encoding参数。
1.3.1.1.14 writable.destroy([error])
v8.0.0加入
- Returns: this
摧毁流,并传递错误对象error。destory()调用之后,流会被结束。实现者不应该覆盖这个主应运,而是应该实现writable._destory.
1.3.2 Readable Streams
可读流是对可消费的数据源的一层抽象。
可读流的例子包括:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
- process.stdin
所有的可读流都实现了stream.Readable类的接口。
1.3.2.1 两种模式
可读流实际上以两种方式运作:流动(flowing)和暂停(pause)。
当处于流动模式,数据从底层系统自动地读取,并进可能快的通过EventEmitter事件提供给应用。
当处于暂停模式,必须显示调用stream.read()方法来从流中读取数据。
所有以暂停模式启动的流都能通过下面几种方式转换成流动模式:
- 添加`data`事件处理器
- 调用stream.resume()方法
- 调用stream.pipe()方法向Writable发送数据。
可读流可以通过下面几中方式转回暂停模式:
- 如果没有指定pipe目的,调用stream.pause方法
- 如果有pipe目的,清除任何的`data`事件处理,并且清除所有的pipe目的,通过stream.unpipe()方法
一个重要的概念需要记住,就是Readable不会产生数据,直到提供了一种消费或者忽略数据的机制。如果消费机制不能使用,或者被取缔的话,Readable将试图停止生成数据。(?????)
注意:因为历史兼容性原因,移除`data`事件处理器不会自动暂停流,而且,如果有pipe目的,一旦这些目的流排干,并且请求更多的数据,调用stream.pause()方法不用保证流保持暂停模式。(???)
注意:如果一个Readable被转换为流动模式,并且没有可用的消费者处理数据,数据将会丢失。这种情况会生在readable.resume()方法调用后,没有为`data`事件添加处理器,或者`data`事件被移除。
1.3.2.2 三种状态
可读流的两种运作方式是对更复杂的内部状态管理的一种简化,这种复杂的内部状态发生在可读流的内部实现中。
特别地,在任何时候,每个可读流都处于下面三种状态的一种:
- readable.readableFlowing = null
- readable.readableFlowing = false
- readable.readableFlowing = true
当readable.readableFlowing = null时,没有一种为消费流数据的机制提供,所以流不会产生数据。处于这种状态时,为流添加`data`事件处理器,调用readable.pip()方法,或者调用readable.resume()方法将会把readable.readableFlowing转换为true,导致可读流开始主动触发事件,因为数据产生了。
调用readable.pause(),readable.unpipe()或者接到背压将导致readable.readableFlowing被设置为false,此时会暂时中止流动事件,但数据的产生不会中止,处于这个状态时,添加`data`事件处理器不会导致readable.readableFlowing为true。
const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable(); pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // will not emit 'data'
pass.resume(); // must be called to make 'data' being emitted
在readable.readableFlowing是false时,数据可能会在内部buffer里堆积。
1.3.2.3 选择一个
在不同的nodejs版本中不断演化的可读流API提供了多种消费流数据的方式。大体上,开发者应该选择一种,并且永远不要对单一的流使用多种方式消费数据。
对于大多数使用者,推荐使用stream.pipe()方法,因为它提供了消费流数据的最简单的方式。开发者如何需要更精细粒度的控制数据的生产和传输,可以使用EventEmitter和readable.pause()/readable.resume()。
1.3.2.4 Class: stream.Readable
v0.9.4加入
1.3.2.4.1 Event: 'close'
v0.9.4加入
当流被关闭,并且底层资源(文件描述符)被关闭时,`close`事触发。这个事件表明不再有更多的事件将触发,也没有更多的计算出现。
不是所有的可读流都会触发`close`事件。
1.3.2.4.2 Event: 'data'
v0.9.4加入
chunk <Buffer> | <string> | <any> 数据chunk。对于不是对象模式的流,chunk是string、Buffer。对于对象模式的流,chunk是任何JavaScript值,不能是null。
当流正在放弃数据chunk的所有权,给一个消费者时,`data`事件触发。这可能发生在流被转换到流动模式,通过readable.pipe(), readable.resume()或者添加`data`事件处理器。不管任何时候readable.read()方法被调用,并且数据chunk能被返回时,`data`事件将会触发。
为流添加`data`事件处理器会转换成流动模式,数据一旦可用,将会被传递。
处理器回调函数会传入数据chunk,如果使用readable.setEncoding()
指定了默认的编码方式,chunk数据将作为string传递,否则chunk将以Buffer对象传递。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
1.3.2.4.3 Event: 'end‘
v0.9.4
当流里没有更多的数据消费时,`end`事件触发。
注意:`end`事件不会触发,除非数据被完全消费完。如果把流转换为流动模式,或者不断的使用stream.read()方法直到数据被完全消费,将会是一件技艺高超的事。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
1.3.2.4.4 Event: 'error'
v0.9.4加入
`error`事件可能随便触发。典型地,error事件可能在这种情况下发生:由于底层内部错误,底层流不能产生数据,或者流试图push非法的chunk数据到内部伯buffer。
监听器将传入Error对象。
1.3.2.4.5 Event: 'readable'
v0.9.4加入
当有可用的数据准备从流中读取时,`readable`事件触发。在某些情况下,为`readable`事件添加监听器将导致一些数据即将被读入到内部buffer中。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// there is some data to read now
});
`readable`事件同样会在流数据读完,在`end`事件前触发。
实际上,`readable`事件表明这个流的一个新信息:要么新数据可用,要么流到尾了。对于 前者,stream.read()将返回可用的数据,在后者,stream.read()将返回null。在下面的例子中,foo.txt是个空文件:
const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
执行脚本,输出是这样:
$ node test.js
readable: null
end
注意:大体上,比起`readable`事件,readable.pipe()和`data`事件机制更容易理解。无论如何,`readable`可能导致吞吐量不断增加。(???点解)
1.3.2.4.6 readable.isPaused()
v0.11.14加入
- Returns: <boolean>
返回当前Readable的操作状态。这个主法方要是由readable.pipe()方法之下的机制使用。在大多数情况下,没直接使用这个方法。
const readable = new stream.Readable(); readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
1.3.2.4.7 readable.pause()
v0.9.4加入
- Returns: this
readable.pause方法将导致处于流动模式的流停止触发`data`事件,断开流动模式,任何可用的数据将保留在内部buffer中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
1.3.2.4.8 readable.pipe(destination[, options])
v0.9.4加入
- destination <stream.Writable> 写入数据的目的流
- options <Object> pipe选项
end <boolean> 当reader结束时,writer也结束,默认是true。
readable.pipe()方法附加一个Writable流到readable,导致自动转换为流动模式,并且push所有的数据到附加的Writable流。数据流会自动管理,以便目的Writable流不会超过更快的Readable流。
下面的例子从readable流中导向所有的数据到一个文件,名为file.txt:
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);
为单独一个Readable流添加多个Writable流是可能的。
readable.pip()方法返回指向目的流的引用,这使得建立一系列链式的流成为可能:
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当源Readable流触发`end`事件时,目的Writable流上的end()方法会被调用,以致目的Writable不再可写。可能通过传递end参数取消这个默认行为,如此,目的Writable流会保持打开:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
一个重要的警告是,如果Readable流在动作的时候触发了一个错误,Writable流不 会被自动关闭。如果错误发生,手动关闭每个流是必要的,因为可以防止内存泄露。
注意:不管什么样的参数,process.stderr和process.stdout这两个Writable流永远不会被关闭,直到Nodejs进程退出。
1.3.2.4.9 readable.readableHighWaterMark
v9.3.0加入
返回构造Readable时传入的highWaterMark值。
1.3.2.4.10 readable.read([size])
v0.9.4加入
read()方法从内部buffer里pull一些数据,并返回。如果没有可用的读数据,将返回null。默认下,返回的数据是Buffer对象,除非有用readable.setEncoding()方法设置编码,或者当前流是对象模式。
可选的size参数指定了要读取的字节数。如果没有足够的size字节数读取,将会返回null,除非流已经结束了,在这种情况下,所有在内部buffer的数据将会被返回。
如果没有指定size参数,所有在内部buffer的数据将会被返回。
readable.read()方法应该只在暂停模式下调。在流动模式,readable.read()会自动被调用,直到内部buffer完全drained。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
console.log(`Received ${chunk.length} bytes of data.`);
}
});
大体上,推荐开发者避免使用`readable`事件和readable.read()方法,而是使用readable.pipe()或者`data`事件。
一个对象模式的Readable流不管调用readable.read(size)方法时传入什么size的值是什么,都只会返回单一的一项。
注意:如果readable.read()方法返回一个数据chunk,`data`事件也同样会被触发。
注意:在`end`事件发生后,调用stream.read([size])将会返回null,不会有运行时错误抛出。
1.3.2.4.11 readable.readableLength
v9.4.0加入
返回在队列里准备读的字节数。
1.3.2.4.12 readable.resume()
v0.9.4加入
- Returns: this
readable.resume()方法会明确地导致一个已经暂停的Readable流开始恢复触发`data`事件,把流转换为流动模式。
resume()方法能被用于从流中完全消费数据,而在没有任何处理任何数据的情况下:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
1.3.2.4.13 readable.setEncoding(encoding)
v0,9.4加入
- encoding <string> 编码类型
- Returns: this
readable.setEncoding()方法设置从Readable流读取数据时的字符编码。
默认情况下,没有指定编码的话,流数据将返回Buffer对象。设置编码后将返回指定编码的string字符串。比如说readable.setEncoding(‘utf8’)将导致输出的数据被编码为utf-8。readable.setEncoding(‘hex’)将导致数据以16进制的字符串形式输出。
Readable流会适当的处理多字节字符,如果只是简单的从流里pull数据
作为Buffer对象,会被不恰当的解码。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
1.3.2.4.14 readable.unpipe([destination])
v0.9.4加入
- destination <stream.Writable> 可选的特定流
readable.unpipe()方法拆卸之前使用stream.pipe()方法附加的Writable流。
没有指定目的流,所有的pipes会被拆卸。
如果指定了目的流,但没有pipe建立,那这个方法不做什么事。
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt');
readable.unpipe(writable);
console.log('Manually close the file stream');
writable.end();
}, 1000);
1.3.2.4.15 readable.unshift(chunk)
- chunk <Buffer> | <Uint8Array> | <string> | <any> 将chunk数据添加到读队列首上。对于不是对象模式的流,chunk必须是string,Buffer或者Uint8Array。对于对象模式流,chunk是JavaScript值。
readable.unshift()方法push chunk数据到内部的buffer。在这种场景下会很有用:流被消费后,需要反消费一些数据,以便数据能被其他第三方获取。
注意:在`end`事件触发后,stream.unshift(chunk)方法不能再调用,否则会抛出一个运行时错误。
使用stream.unshift()方法应该考虑使用Transform代替。
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// remove the readable listener before unshifting
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
注意:不像stream.push(chunk),通过重置内部读状态,unshift不会结束读过程。如果unshift()在读数据期间被调用,这可能导致意想不倒的结果。调用unshift()后紧接着调用stream.push(‘’)会合适地重置读状态,但是当在进行一个读过程时,最好还是避免使用unshift()方法。
1.3.2.4.16 readable.wrap(stream)
v0.9.4加入
- stream <Stream> 一个老式的可读流
早于Nodjes v0.10的版本有一些流没有完全实现如今stream模块的API。
当使用旧的Nodejs库触发`data`事件,并且只有stream.pause()方法时,readable.wrap()方法可以用来创建一个Readable流,将旧的流作为数据源。
很少使用readable.wrap()方法,这个方法提供了一种简便的与旧Nodejs版本交互的方式。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader); myReader.on('readable', () => {
myReader.read(); // etc.
});
1.3.2.4.17 readable.destroy([error])
v8.0.0加入
摧毁流,并且触发`error`事件。之后,可读流会释放内部资源。实现者不应该覆盖这个方法,而是应该实现readable._destory。
1.3.3 Duplex and Transform Streams
1.3.3.1 Class: stream.Duplex
Duplex 流实现了Readable和Writable接口。
Duplex流包括:
1.3.3.2 Class: stream.Transform
v0.9.4加入
Transform流也是Duplex流,只是它的输出与输入存在关联。像所有的Duplex流一样,Transform流实现了Readable和Writable接口。
Transform流包括:
1.3.3.2.1 transform.destroy([error])
v8.0.0加入
摧毁流,并且触发`error`事件。之后,transform流会释放内部资源。实现者不应该覆盖这个方法,而应该实现readable._destory。默认的_destory实现会触发`close`事件。
1.4 API for Stream Implementers
stream模式的API的设计使得使用Javscript原型继承流变得简单。
首先,开发者应该声明一个新的JavaScript类,继承自四种基本流类(Writable, Readable, Duplex, Transform),并且确保调用父类的构造函数:
const { Writable } = require('stream'); class MyWritable extends Writable {
constructor(options) {
super(options);
// ...
}
}
新的流类必须实现一个或者多个特定的方法,实现什么方法取决于创建的流,像下面的图表所示:
注意:实现代码最好不要调用公共的方法。以免在消费流的时候导致不利的副作用。
1.4.1 简化的构造器
v1.2.0加入
在一些简单的情况下,不必通过继承来构造一个流。在创建流对象的时候传递合适的方法作为参数了也是可行的:
const { Writable } = require('stream'); const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
}
});
1.4.2 实现Writable Stream
stream.Writable类旨在实现一个Writable流。
自定义一个Writable stream必须调用 new stream.Writable([options])构造器,并且实现writable._write()方法。writable._writev()方法可选择实现。
1.4.2.1 构造器: new stream.Writable([options])
options <Object>
- highWaterMark <number> 默认16kb,对于对象模式是16.
- decodeStrings <boolean> 在传递到_write()方法前,是否把字符串解码进Buffer。默认是true.
- objectMode <boolean> 决定stream.write(anyObj)方法是否有效。如果设置了,则应该写入JavaScript值,而不是string,Buffer或Unit8Array。默认是false.
- write <Function> stream._write()方法的实现.
- writev <Function> stream._writev()方法的实现.
- destroy <Function> stream._destory()方法的实现.
- final <Function> stream._final()方法的实现.
比如:
const { Writable } = require('stream'); class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor
super(options);
// ...
}
}
或者使用es6之前的构造器:
const { Writable } = require('stream');
const util = require('util'); function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者使用简化的构造器:
const { Writable } = require('stream'); const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
}
});
1.4.2.2 writable._write(chunk, encoding, callback)
- chunk <Buffer> | <string> | <any> 要写入的chunk数据。如果decodeStrings设置为true,chunk是Buffer对象;如果是false,chunk不是Buffer。如果是对象模式,也不是Buffer。
- encoding <string> 如果chunk是string,那么会使用encoding编码chunk。如果chunk是个buffer或者对象模式流,encoding值会被忽略。
- callback <Function> 当提供的chunk被完全处理时,回调这个函数。
所有实现Writable流的类都必须提供writable._write()方法,向底层资源发送数据。
注意:Transform流有自己的writable._write()实现。
注意:这个方法一定不能由开发者直接调用。应该由子类实现,再由内部Writable类自行调用。
callback方法用来获取此次写入是否成功或者失败。传递的第一个参数是一个error对象,如果成功,error是null,如果失败,error是一个对象。
在writable._write()方法和callback方法调用间隙,调用writable.write()将导致写入的数据被buffer。一旦callback方法完成,`drain`事件将触发。如果一个流能够同时处理多个chunk数据,那么应该实现writable._writev()方法。
如果在构造函数中设置了decodeStrings,chunk是一个string而不是Buffer,encoding选项将表明string的编码。这是为了支持特定的字符串编码。如果decodeStrings被显示地设置为false,encodeing参数会被忽略,并且chunk将保持不变,进而传递给.write()。
writable._write()方法是下划线开头的,因为这是一个内部方法,不应该在用户代码里直接调用。
1.4.2.3 writable._writev(chunks, callback)
- chunks <Array> 要写入的chunks,每个chunk都有这样的格式:{ chunk: …, encoding: …}.
- callback <Function> 当处理完传入的chunks,回调.
注意:不能直接调用。应该由子类实现,再由Writable内部调用。
writable._writev()方法可能是writable._write()方法的另一种实现。旨在处理多个chunk数据。如果实现了,将随着所有buffer在写队列里的数据,调用这个方法。
writable._writev()方法是下划线开头的,因为这是一个内部方法,不应该在用户代码里直接调用。
1.4.2.4 writable._destroy(err, callback)
v8.0.0加入
- err <Error> 可能的错误.
- callback <Function> 回调函数,接收一个可选的error参数.
_destory()方法由writable.destory()方法调用。应该由子类覆盖,但千万不能直接调用。
1.4.2.5 writable._final(callback)
v8.0.0加入
- callback <Function> 当结束写入剩余的数据,调用这个方法,传入可选的error参数。
_final()方法不能直接调用。应该由子类覆盖,再由内部自行调用。
在流结束前,可选的函数参数将会被调用,callback调用后,`finish`事件才会触发。这对于在流结束前,关闭资源和写buffered的数据很有用。
1.4.2.6 写数据时出错误怎么办
在writable._write()和writable._writev()方法出现错误时,推荐通过回调报告错误。如此,将导致`error`事件触发。而在writable._write()方法里抛出一个异常将导致意想不到的秘不一致的行为。使用回调确保处理错误时一致。
const { Writable } = require('stream'); const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
});
1.4.2.7 Writable Stream的一个例子
下面简单的自定义了一个Writable流。尽管这个特定的Writable流没有什么实际用处,但却描绘了自定义一个Writable流所需要的每个方面:
const { Writable } = require('stream'); class MyWritable extends Writable {
constructor(options) {
super(options);
// ...
} _write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
1.4.2.8 在 Writable Stream解析buffer
解码buffer是一件很普遍的事,比如说,当使用输入是string的Transform流时。当处理多字节字符串编码时(像utf-8),这是有意义的。下面的例子展示了如何解码多字节字符,使用StringDecoder和Writable。
const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder'); class StringWritable extends Writable {
constructor(options) {
super(options);
const state = this._writableState;
this._decoder = new StringDecoder(state.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
} const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable(); w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]); console.log(w.data); // currency: €
1.4.3 实现一个Readable Stream
stream.Readable类用来实现Readable stream。
自定义的Readable stream必须调用new stream.Readable([options])构造函数,并且实现readable._read()方法。
1.4.3.1 new stream.Readable([options])
options <Object>
- highWaterMark <number> 在停止从底层资源读取前,存储在内部buffer里的最大字节数。默认16kb,对象模式是16。
- encoding <string> 如果指定了,buffer将被解码成特定的编码格式,默认null。
- objectMode <boolean> 是否是对象模式,意思着stream.read(n)将返回单一的一个值,而不是大小为n的buffer对象。默认false。
- read <Function> stream._read()方法的实现.
- destroy <Function> stream._destory()方法的实现.
例子:
const { Readable } = require('stream'); class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor
super(options);
// ...
}
}
使用es6之前的代码风格:
const { Readable } = require('stream');
const util = require('util'); function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者使用简化的构造器:
const { Readable } = require('stream'); const myReadable = new Readable({
read(size) {
// ...
}
});
1.4.3.2 readable._read(size)
- size <number> 要异步读取的字节数。
注意:这个方法千万不能由应用代码直接调用。应该由子类覆盖,再由内部的Readable类调用。
所有实现Redable流的类都必须提供readable._read()方法的实现,来向底层资源获取数据。
当readable._read()被调用,如果底层的数据可用时,开发者应该使用this.push(dataChunk)方法, 把数据push进read queue里面。_read()方法应该继续从底层获取可用数据,并push数据,直到readable.push()返回false。一旦在停止之后,_read()方法再次被调用,就应该push额外的数据到内部read queue。
注意:一旦readable._read()方法被调用了,将不会再调用,直到readable.push方法被调用。
size参数只是个参考。在某些实现中,read是一个单独的操作,size参数用来决定读取多少数据。而一些实现中可能会忽略这个参数,不管是否可用,都只是提供数据,没有必要等到所有的字节数都可用之后,再调用stream.push(chunk)。
_read()下划线开头,表示这是一个内部方法,不应该直接由用户调用。
1.4.3.3 readable._destroy(err, callback)
v8.0.0加入
- err <Error> 一个可能的错误.
- callback <Function> 回调,接收一个可选的error参数.
_destory()由readable.destory()调用。可由子类覆盖,但千万不能直接调用。
1.4.3.4 readable.push(chunk[, encoding])
- chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 要push进read queue的chunk数据。非对象模式的流,chunk是string,Buffer,或Unit8Array。对象模式,chunk可能是任意JavaScript值。
- encoding <string> chunk的编码,必须是一个合法的Buffer编码,比如’utf8’或者’ascii’。
- Returns: <boolean> 如果可以继续push,返回true;否则返回false.
当chunk是一个Buffer,Uint8Array或者string时,chunk数据将会被添加到内部队列,等待被消费。传递chunk为null值表明已经到流的结束了,不再有数据写入。
当Readable操作在暂停模式,由readable.push()进的数据可以通过readable.read()方法读出来,在`redable`事件触发后。
当Redable操作在流动模式,由readable.push()进的数据将由`data`事件传递。
readable.push()主法被设计得尽可能的灵活。比如说,存在一个低级的源,这个源提供了暂停/恢复机制和数据回调,这个低级源能被Redable实例包裹:
// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over. class SourceWrapper extends Readable {
constructor(options) {
super(options); this._source = getLowlevelSourceObject(); // Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// if push() returns false, then stop reading from source
if (!this.push(chunk))
this._source.readStop();
}; // When the source ends, push the EOF-signaling `null` chunk
this._source.onend = () => {
this.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
注意:readable.push()方汉应该由Readable实例者调用,并且只能在readable._read()方法里调用。
1.4.3.5 Reading时错误怎么办
readable._read()方法在处理的时候发生错误时,推荐使用`error`事件,而不是抛出错误。在readable._read()方法里抛出错误可能导致意想不到和不一致的行为。使用`errro`事件确保了错误处理的一致性和可预期性。
const { Readable } = require('stream'); const myReadable = new Readable({
read(size) {
if (checkSomeErrorCondition()) {
process.nextTick(() => this.emit('error', err));
return;
}
// do some work
}
});
1.4.3.6 An Example Counting Stream
下面是个一个基本的Readable流的例子,这个流产生1到1000000的数字,然后结束。
const { Readable } = require('stream'); class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
} _read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = '' + i;
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
1.4.4 实现一个Duplex Stream
Duplex流是都实现了Readable和Writable的流,比如说TCP socket连接。
因为JavaScript不支持多继承,stream.Duplex类实现了Duplex流。
注意:stream.Duplex类在原型链上继承了stream.Readable,并且寄生自stream.Writable,但使用instanceof操作符对两个类都会起作用,因为覆盖了stream.Writable的Symbol.hasInstance。
自定义的Duplex流必须调用new stream.Duplex([options])构造器,并且都要实现readable._read()和writable._write()方法。
1.4.4.1 new stream.Duplex(options)
options <Object> 会传递给Readable和Writable的构造器,有如下字段:
- allowHalfOpen <boolean> 默认true,如果设置为false,当可读端结束时,可写端会自动结束.
- readableObjectMode <boolean> 默认false,为可读端设置对象模式,如果objectMode是true,则无影响.
- writableObjectMode <boolean> 默认false,为可写端设置对象模式,如果objectMode是true,则无影响.
- readableHighWaterMark <number> 设置可读端的highWaterMark值,如果highWaterMark提供了,则无效.
- writableHighWaterMark <number> 设置可写端的highWaterMark值,如果highWaterMark提供了,则无效.
比如:
const { Duplex } = require('stream'); class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者使用es6之前的格式:
const { Duplex } = require('stream');
const util = require('util'); function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
或者使用简单构造器:
const { Duplex } = require('stream'); const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
}
});
1.4.4.2 Duplex Stream的一个例子
下面是一个简单的Duplex的例子,Duplex流包裹一个假想的底层源对象,数据会向这个底层源对象写入数据,并且也能从这个对象读数据,虽然使用的是与Nodejs流不兼容的API。下面的例子中,Duplex流通过Writable接口buffer到来的即将被写入的数据,再通过Readable接口读出。
const { Duplex } = require('stream');
const kSource = Symbol('source'); class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
} _write(chunk, encoding, callback) {
// The underlying source only deals with strings
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
} _read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex流最重要的部分是Readable和Writable都是独立运作,尽管在单一的实例中,他们互相存在。
1.4.4.3 对象模式的Duplex Streams
对于Duplex流,对象模式能专门为Readable和Writable端设置对象模式,使用readableObjectMode和writableOjbectMode选项。
在下面的例子中,创建了一个新的Transform流,在Writable端使用了对象模式接收JavaScript数字,随便在Readable端转化为16进制字符串。
// All Transform streams are also Duplex Streams
const myTransform = new Transform({
writableObjectMode: true, transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary
chunk |= 0; // Transform the chunk into something else.
const data = chunk.toString(16); // Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
}
}); myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk)); myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
1.4.5 实现一个 Transform Stream
Transform流是一个Duplex流,这个流的输出能根据输入计算。像zlib流或者crypto流的压缩,解密或者加密数据。
注意:没有要求输出的大小应该等于输入的大小,同等数据的chunk大小,或者到达的时间。比如说,当输入结束时,一个Hash流只会有一个单一的输出chunk。一个zlib流会产生输出,这个输出比可能更小,也可能更大。
也就是说,Transform的输入输出不用对等,因为可以修改。
stream.Transform类实现 了Transform流。
stream.Transform类在原型上继承自stream.Duplex,并且实现了自己的writable._write()和readable._read()方法。自定义Transform必须实现transform._transform()方法,按需实现transform._flush()方法。
注意:当使用Transform流时需要格式外小心一点,如果Readable端没有消费的话,Writable端可能会暂停。
1.4.5.1 new stream.Transform([options])
options <Object> 传递给Writable和Readable的构造器,同时有以下字段:
- transform <Function> stream._transform()方法的实现.
- flush <Function> stream._flush()方法的实现
例子:
const { Transform } = require('stream'); class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
es6之前的风格:
const { Transform } = require('stream');
const util = require('util'); function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
或者使用简单构造器:
const { Transform } = require('stream'); const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
}
});
1.4.5.2 事件: 'finish' and 'end'
`finish`和`end`事件分别来自stream.Writable和stream.Readable。`finish`事件是stream.end()调用后触发,并且所有的chunk都由stream._transform()函数处理了。`end`事件 是所有的数据都输出后触发,即在transform._flush()调用后,回调。
1.4.5.3 transform._flush(callback)
- callback <Function> 回调函数,当剩余的数据都被flush后,回调该函数,传入error参数。
注意:这个函数千万不能直接调用。应该由子类实现,在内部Readable类调用。
在某些情况下,一个变换的操作在流结束时,可能需要触发一些额外的数据。比如,zlib压缩流会存储大量的内部状态,以便最好地压缩输出,当这个流结束时,这些额外的数据应该被flush,以便压缩数据得以完成。
自定义Transform可以按需实现readable._flush()方法,在没有更多的被写入的数据消费时,这个方法会被调用,在`end`事件触发前。
在transform._flush()实现内部,readable.push()方法可能被零次或多次调用,这得视情况而定。当flush操作完成,则调用回调。
transform._flush()方法是下划线开头的,表明这是一个内部函数,不应该直接调用。
1.4.5.4 transform._transform(chunk, encoding, callback)
- chunk <Buffer> | <string> | <any> 需要被转换的chunk。如果decodeStrings选项设置成false或者流处于对象模式,将会是一个Buffer.
- encoding <string> 如果chunk是string,这将是编码类型。如果chunk是buffer,这是一个特定的值’buffer’。
- callback <Function> 回调函数,当chunk被处理后回调,传入error参数。
注意:这个函数千万不能直接调用。应该由子类实现,再由内部Readable调用。
所有的Transform流必须提供_transform()方法来接收输出,产生输出。transform._transform()处理写入的数据,计算得到输出,然后使用readable.push()方法传递输出到Readable端。
transform.push()方法可能被多次调用,来从单一的输入chunk生成输出。
从任何输入的chunk数据,可能不再产生输出。这是有可能。
只有在当前chunk被完全消费后,callback函数才必须被调用。第一个传递到callback的是eror对象,如果没有有错误产生,则是null。如果第传递了第二个参数,将会被转发到readable.push()方法,换句话说,下面是等价的:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
}; transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform()方法是下划线开头的,因为这是一个内部函数,不应该直接调用。
transform._transform()永远不会并行调用,流的内部实现是队列机制,为了接收下一个chunk,callback必须被调用,要么同步要么异步。
1.4.5.5 Class: stream.PassThrough
stream.PassThrough类是Transform流的一个实现,这个类没有什么实际意义,只是简单的把输入传给输出。它主要作为例子和测试出现.
1.5 附加说明
1.5.1 与旧Nodejs版本的兼容
在v0.10之前的版本,Readable流接口更简单,但同时功能不够强大,也不够用。
- 相比于等待调用stream.read()方法,`data`事件会马上触发。如果一个应该需要花费一些工作来决定如何处理数据,那么,应用需要存储读取的数据到buffer,以便数据不会丢失。
- stream.pause()方法只是参考,不能保证。这意味着仍然有必要接收`data`事件,甚至当流处于暂停状态时。
在v0.10.0版本,增加了Readable类。为了向后兼容兼容旧的Nodejs版本,当添加 了`data`事件或者调用stream.resume()方法后,Readable流转换为流动模式。这样的影响就是,即使不使用新的stream.read()方法和`readable`事件,也不用担心丢失数据chunk。
尽管大部分应用都能正常运行,还是存在一边缘案例,如下所述:
- 没有添加`data`事件监听器.
- 从没调用过stream.resume().
- stream没有被pipe到任何目的.
比如说,考虑如下的代码:
// WARNING! BROKEN!
net.createServer((socket) => { // we add an 'end' method, but never consume the data
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
}); }).listen(1337);
在早于v.10版本的Nodejs,到来的消息被简单的丢弃。然后在v0.10版本及后面的版本,scoket永远保持暂停状态。这种情况下,暂时的解决的办法是调用stream.resume()方法:
// Workaround
net.createServer((socket) => { socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
}); // start the flow of data, discarding it.
socket.resume(); }).listen(1337);
除了新的Readable流可以转换到流动模式外,v0.10以前的流能使用readable.wrap()方法包裹进Readable类中。
1.5.2
readable.read(0)
有这样一些情况:需要对底层Readable流机制触发一次刷新,同时不消费任何数据。这时就可以调用readable.read(0),这个方法始终返回null。
如果内部的reader buffer小于highWaterMark,并且流没有在reading,调用stream.read(0)将触发底层stream._read()调用。
尽管大部分应用都不需要这么做,但在Nodejs内部有一些情况要这么做,尤其是在Readable流内部。
1.5.3
readable.push('')
不推荐使用。
push一个零字节字符串,Buffer或者Unit8Array到一个不是对象模式的流中会有一个很有意思的副作用。因为是调用readable.push(),这个调用会结束reading进程。但因为这是一个空字符串,没有数据添加到Readable buffer里,所以没有可以消费的东西。
1.5.4
highWaterMark
与
readable.setEncoding()
readable.setEncoding()会改变处于非对象模式流的highWaterMark的运作行为.
典型地,当前buffer的size是比对highWaterMark值测量的,单位是字节。然而,在调用setEncoding()方法后,比较函数将会是以字符数测试。
在使用latin1或ascii码时,没有什么问题。但建议在处理多字节字符时,考虑这种行为。