简单了解node stream

Almost all Node.js applications, no matter how simple, use streams in some manner. 

开篇先吓吓自己。画画图,分析分析代码加深自己的理解。

简单了解node stream

  • stream基本概念
  • Readable - 可读取数据的流
  • Writable - 可写入数据的流
  • 总结

 1.stream基本概念

1.1什么是 stream

1. 在编写代码时,我们应该有一些方法将程序像连接水管一样连接起来 -- 当我们需要获取一些数据时,可以去通过"拧"其他的部分来达到目的。这也应该是IO应有的方式。 -- Doug McIlroy. October 11, 1964

结合到node中

stream 就像是一个抽象的模型(有点像水管),能有序的传输数据(有点像水),需要时就拧开水管取点用,还可以控制大小。

简单了解node stream

Node.js 中有四种基本的流类型:
  • Writable - 可写入数据的流(例如 fs.createWriteStream())。
  • Readable - 可读取数据的流(例如 fs.createReadStream())。
  • Duplex - 可读又可写的流(例如 net.Socket)。
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())。

 1.Readable-可读取数据的流

1.1 简单描述Readable 可读取数据流

可读流是对提供数据的来源的一种抽象。就像水管传递水资源供我们消费使用一样。

可读流有两种模式:流动模式(flowing)或暂停模式(paused)

  • 流动模式flowing,数据自动从底层系统读取,并通过EventEmitter 接口的‘’data'事件尽可能快地被提供给应用程序。
  • 暂停模式paused, 数据必须显示通过调用stream.read()读取数据。

Stream 实例的 _readableState.flow(readableState 是内部用来存储状态数据的对象) 有三个状态:

  • _readableState.flow = null,暂时没有消费者过来(初始状态)
  • _readableState.flow = false,
  • _readableState.flow = true,

1.2Readable 可读取数据流 flowing 模式

举个例子: flowing 模式,一旦绑定监听器到 'data' 事件时,流会转换到流动模式_readableState.flow = true

const { Readable } = require('stream');
class myReadable extends Readable {
  constructor(options,sources) {
    super(options)
    this.sources = sources
    this.pos = 0
  }
  // 继承了Readable 的类必须实现 _read() 私有方法,被内部 Readable类的方法调用
  // 当_read() 被调用时,如果从资源读取到数据,则需要开始使用 this.push(dataChunk) 推送数据到读取队列。 
  // _read() 应该持续从资源读取数据并推送数据,直到push(null)
  _read() {
    if(this.pos < this.sources.length) {
      this.push(this.sources[this.pos])
      this.pos ++ 
    } else {
      this.push(null)
    }
  }
}
let rs = new myReadable({},"我是罗小布,我是某个地方来的水资源")
let waterCup = ''
// 绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当流将数据块传送给消费者后触发。 
rs.on('data',(chunk)=>{
  console.log(chunk); // chunk 是一个 buffer
  waterCup += chunk
})
rs.on('end',()=>{
  console.log('读取消耗完毕');
  console.log(waterCup)
})

 

从上述代码开启调试:

大概的画了一下flowing模式的流程图:(这个图真心不好看,建议看后面的那个)

简单了解node stream

一旦开始监听data方法,Readable内部就会调用read方法,来触发读流操作,

_read() 函数里面push 是同步操作会先将数据存储在this.buffer (this.buffe = new bufferList(),bufferList是内部实现的数据结构)变量中,然后再从this.buffer 变量中取出,emit('data',chunk) 消费掉

_read() 函数里面push 是异步,一旦异步操作中调用了push方法,且有数据,无缓存队列,此时会直接emit('data',chunk) 消费掉。

但是如果在读取数据的途中调用了stream.pause() 此时会停止消费数据,但不会停止生产数据,生产的数据会缓存起来,如果流的消费者没有调用stream.read()方法, 这些数据会始终存在于内部缓存队列中(this.buffe = new bufferList(),bufferList是内部实现的数据结构),直到被消费。

由上简化图形:

简单了解node stream

 

flowing 模式是自动获取底层资源不断流向消费者,是流动的。

数据自动从底层系统读取,并通过EventEmitter 接口的‘’data'事件尽可能快地被提供给应用。

1.3 .flowing 模式在 node 其它模块中的使用

已经封装好的模块更关注数据消费部分

http 模块

let http = require('http')

let server = http.createServer((req,res)=>{
  var method = req.method;
  if(method === 'POST') {
    req.on('data',()=>{ // 接收数据
      console.log(chunk)
    })
    req.on('end',()=>{
      // 接收数据完成
      console.log(chunk)
      res.end('ok')
    })
  }
})
server.listen(8000)

fs 模块

let fs = require('fs')
let path = require('path')
let rs = fs.createReadStream(path.resolve(__dirname,'1.txt'),{
  flags: 'r+',
  highWaterMark: 3,
})
rs.on('data',(data)=>{ // 接收数据
  console.log(data.toString())
})
rs.on('end',()=>{ // 接收数据完成
  console.log('end')
})
rs.on('error',(error)=>{
  console.log(error)
})

1.4.Readable 可读取数据流 paused模式

举个例子: paused模式,一旦绑定监听器到 'readable' 事件时,流会转换到暂停模式_readableState.flow = false

 

上一篇:Maven笔记


下一篇:【Postgres扩展】pg_auto_failover支持高可用性和自动故障转移