深入node4 可写流的实现

可写流的使用

深入node4 可写流的实现
可写流的highWaterMark表示期望这个文件接受多少个值。
深入node4 可写流的实现
深入node4 可写流的实现
深入node4 可写流的实现
end不仅会写入,而且会触发close事件。
深入node4 可写流的实现
深入node4 可写流的实现
深入node4 可写流的实现
一个true,一个false,是因为我们的highWaterMark设置了3,希望只用3个内存来写,但是返回的值与我们是否写入无关,返回false也会写入。

  • 但是有个问题,我们写多个wirte的时候,是并发异步操作,所以不能确定哪个快哪个慢。

  • 可以将并发异步操作变为串行异步。

  • 除了第一次的write,接下来的write排队,第一个完成后,将队列中的每一个write拿出来执行。有点像eventsloop,直到清空队列,但是队列缓存可能会过大,所以需要一个预期,也就是highWaterMark来控制,达到预期后,就不要调用write方法。虽然再调用也会写入进去。

  • 结合fs.createStream。
    深入node4 可写流的实现
    当我们第一次写入的时候,当文件吃不下了,也就是到highWaterMark的值了,就应该停止写入了,等文件吃完了,触发了drain方法,这时候再去恢复rs.resume(),此时后面的写入就是往真实的文件中去写了,而不是一直在排队了。读取默认的highWaterMark是64k,而写入的默认highWaterMark是16k。

  • 这跟第一种并发异步写入有很大区别。

可写流的实现

可写流是基于链表实现的,因为可写流涉及队列排序的问题,比较多的使用了头部的增删。而链表在头部和尾部的增删的时候效率比较高。
链表的实现可以观看:单链表,双链表

  • 可读流内部是基于steam和stream的Readable(跟我们实现的MyReadSteream类相似,内部也是调用fs.read)以及events来实现的。fs自己实现了ReadStream继承stream的Readable,并且实现自己的_read方法。
  • 可写流内部是基于stream和Writable以及events来实现的,fs实现的WrtieStream继承了stream的Writable。自己实现的_wirte方法供父类的wirte方法调用。

实现十个数,希望使用三个内存来处理

深入node4 可写流的实现
原生的。

实现自己的WriteStream

思路:跟可读流一样,将open和wirte操作分离,通过事件发布的模式。然后通过变量判断当前write是否是第一次调用,保证每次只有一个write在执行,其他的全部扔进缓存,当write执行完毕之后,再从缓存中一个一个拿出来执行write。直到缓存清空完毕,触发drain事件,通知用户缓存清完了,变量置为初始化。
这样就能使并发异步操作变成串行异步操作。
初始化变量,这里跟ReadStream有点区别,比如没有end,encoding默认写入是utf8等等。
深入node4 可写流的实现
len用来判断当前缓存的值的长度,needDrain用来判断缓存是否过多是否达到期望值。cahce是缓存队列。writing用来标识是否第一次写入。

  • open方法
    深入node4 可写流的实现
  • 执行write方法
    深入node4 可写流的实现
    因为数据可能有中文,有英文,所以第一步默认转为buffer。然后判断当前write写入的buffer的长度+this.len(缓存队列的长度)与期望值highWaterMark相比较,是否超过期望值。
    再重写cb函数,这样每次写入成功调用cb的时候,就会执行clearBuffer方法。
    然后通过writing判断是不是第一次写入,第一次写入就调用_write方法,真正的执行fs.write,否则就放入缓存之中。
  • _write方法
    深入node4 可写流的实现
    write方法可能用户执行的时候open还没打开,因为是异步,所以需要做处理,然后执行write方法,写入内容,每次写完之后都要维护偏移量,并且减少len。执行cb()。cb是重写过的,会调用clearBuffer方法。
  • clearBuffer方法
    深入node4 可写流的实现
    这个方法主要将缓存中的write任务一个一个拿出并执行,注意这里调用的是_write方法,是要真正写入文件的。然后清空缓存之后,需要出发drain事件,告诉用户已经清空,当前可以继续write。并且重置一些变量。深入node4 可写流的实现
    深入node4 可写流的实现
    深入node4 可写流的实现
    触发四次drain事件,成功写入四个。

总结:

通过缓存以及全局变量,和事件系统,只有第一次write才会真正执行fs.write,其他的write方法会放入缓存,只有当fs.write执行完毕之后,才会继续从缓存取出进行fs.wrtie。直到缓存为空,再触发drain事件,重置变量,让用户可以继续调用write方法写入。就好比人吃饭,喂一口大的,它总要先吃一点小的,剩下部分在口腔里嚼,并且返回false告诉你,先别喂,等全部吃完。当它慢慢嚼完并吞下当前口腔有的事物之后,再告诉你,可以继续喂了,依次类推,反复循环,直到这碗饭吃饭(全部写入)

优化 使用链表代替数组cache

深入node4 可写流的实现
链表的实现:在单链表这里。
实验:
深入node4 可写流的实现
深入node4 可写流的实现
成功删除了第一个3,链表此时是6=>10。
深入node4 可写流的实现

深入node4 可写流的实现
深入node4 可写流的实现
改造完成,深入node4 可写流的实现
期望是3,所以第一次会调用三次write,两个write存入queue中,如
深入node4 可写流的实现
首先queue中有两个,然后取出49之后,有剩一个50,再取出来之后就为空。完成。
全部代码:

// WriteStream
const LinkedList = require("./linklist");

//基于链表实现队列
class Queue {
  constructor() {
    this.link = new LinkedList();
  }
  //在最后一个中加入
  offer(element) {
    this.link.append(element);
  }
  //移除链表第一位并返回
  shift() {
    return this.link.removeAt(0);
  }
}

class WriteStream extends EventMitter {
  constructor(path, options) {
    super();
    this.path = path;
    this.flags = options.flags || "w";
    this.encoding = options.encoding || "utf8";
    this.autoClose = options.autoClose || true;
    this.mode = options.mode || 0o666;
    this.start = options.start || 0;
    this.highWaterMark = options.highWaterMark || 16 * 1024;
    this.emitClose = options.emitClose || true;
    this.offset = this.start; // 每次写入文件的位移数

    this.fd = undefined;

    this.len = 0; //判断的缓存
    this.needDrain = false; //是否需要触发drain
    this.cache = []; //第二次开始的写入缓存
    this.writing = false; //标识是否正在写入
    this.queue = new Queue();
    this.open();
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      this.fd = fd;
      this.emit("open", fd);
    });
  }

  _write(chunk, encoding, cb) {
    if (typeof this.fd !== "number") {
      this.once("open", () => {
        this._write(chunk, encoding, cb);
      });
      return;
    }
    fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => {
      this.offset += written; //维护偏移量
      this.len -= written; //把缓存的个数减少
      cb(); //回调
    });
  }

  //依次将缓存队列等待任务拿出来一个个写入。
  clearBuffer() {
    const a = JSON.stringify(this.queue);
    console.log(a);
    const data = this.queue.shift();
    if (data) {
      this._write(data.chunk, data.encoding, data.cb);
    } else {
      //缓存读取完了,需要触发drain事件
      this.writing = false; //当前写入已经完成了。
      if (this.needDrain) {
        //如果存入过多导致期望值过大
        this.needDrain = false;
        this.emit("drain");
      }
    }
  }
}

//模拟Stream的Writable的实例上的write方法
EventMitter.prototype.write = function (
  chunk,
  encoding = this.encoding,
  cb = () => {}
) {
  // 异步方法
  // 注意写入的中文跟英文比较。 将数据全部转换为Buffer
  chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
  this.len += chunk.length;

  //判断到达期望了吗
  const returnValue = this.len < this.highWaterMark ? true : false;

  //当数据写入后,需要触发drain并且将Len减减
  this.needDrain = !returnValue;

  //  清空缓存队列的逻辑
  const userCb = cb;
  cb = () => {
    userCb();
    this.clearBuffer();
  };

  //判断是否第一次给入的数据,因为后面的write都是直接放入缓存中
  if (!this.writing) {
    //第一次写入,真正执行写入操作
    this._write(chunk, encoding, cb);
    this.writing = true;
  } else {
    //第二次写入了,存入缓存期
    this.queue.offer({
      chunk,
      encoding,
      cb,
    });
  }
  return returnValue;
};

const ws = new WriteStream("b.txt", {
  flags: "w",
  encoding: null,
  autoClose: true,
  start: 0, //没有end属性,只有start
  highWaterMark: 3, //与可读流不一样,可写流的highWaterMark表示期望这个文件只接受3个内存
});

ws.on("open", () => {
  console.log("文件打开");
});

let i = 0;
function write() {
  let flag = true;
  while (i < 4 && flag) {
    flag = ws.write(i++ + "");
    console.log('flag', flag);
  }
}

ws.on("drain", () => {
  //只有当吸入的数据达到了预期,并且数据已经被写入文件之后才会触发drain事件。
  console.log("写完了");
  write();
});

write();

// 链表
class Node {
  constructor(data) {
    this.data = data;
    this.next = null;
  }
}
module.exports = class LinkedList {
  constructor() {
    //头指针
    this.head = null;
    //链表的长度
    this.length = 0;
  }

  append(data) {
    const element = new Node(data);
    if (!this.head) {
      this.head = element;
    } else {
      let current = this.head;

      //遍历找到最后的节点
      while (current.next) {
        current = current.next;
      }
      //插入
      current.next = element;
    }
    this.length++;
  }

  // //特定位置插入
  insert(position, data) {
    if (
      typeof position !== "number" ||
      position < 0 ||
      position > this.length
    ) {
      return false;
    }
    const element = new Node(data);
    //插入首位
    if (position === 0) {
      element.next = this.head;
      this.head = element;
    } else {
      //插入中间的一位
      let current = this.head;
      let index = 1;
      //采用插入位置前一位进行操作,比如插入到第四个,就将element.next指向第三个的下一个,再将第三个的下一个重新向element
      while (index++ < position) {
        // 如position = 4, index = 4的时候, current指向第三个,因为执行了两遍
        current = current.next;
      }
      // 让element变成第四个
      element.next = current.next;
      current.next = element;

      // for (let i = 1; i < position; i++) {
      //     if (i === position - 1) {
      //         element.next = current.next
      //         current.next = element
      //         break;
      //     }
      //     current = current.next
      // }
    }
    this.length++;
  }

  // //获取对应位置的元素
  get(position) {
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) {
      return undefined;
    }
    let current = this.head;
    let index = 0;
    while (index++ < position) {
      current = current.next;
    }
    return current.data;
  }

  // //返回元素在列表中的索引
  indexOf(data) {
    let current = this.head;
    let index = 0;
    while (current) {
      if (current.data === data) {
        return index;
      }
      current = current.next;
      index++;
    }
    return -1;
  }

  // //修改某个位置的元素
  update(position, data) {
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) {
      return false;
    }
    let current = this.head;
    let index = 0;
    while (index++ < position) {
      current = current.next;
    }
    current.data = data;
  }

  // //从列表的特定位置移除一项
  removeAt(position) {
    if (
      typeof position !== "number" ||
      position < 0 ||
      position >= this.length
    ) {
      return undefined;
    }
    let current = this.head;
    if (position === 0) {
      const headElData = this.head.data;
      this.head = this.head.next;
      this.length--;
      return headElData;
    }
    let index = 0;
    // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
    while (index++ < position - 1) {
      current = current.next;
    }
    const currentElement = current.next;
    let element = current.next.next; //保存第三个节点
    current.next.next = null; //让第二个节点与第三个节点断掉关系
    current.next = element; // 第一个节点指向第三个节点
    this.length--;
    return currentElement.data;
  }

  // 从列表移除一项
  remove(element) {
    let current = this.head;
    let nextElement;
    let isSuccess;

    //如果第一个就是
    if (current.data === element) {
      nextElement = this.head.next;
      this.head = element;
      element.next = nextElement;
      return true;
    }

    // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
    while (current) {
      if (current.next.data === element) {
        nextElement = current.next.next; //存粗下下个节点
        current.next.next = null; //断开下个节点与下下个节点的联系
        current.next = nextElement; //连接下下个节点
        isSuccess = true;
        break;
      } else {
        current = current.next;
      }
    }

    //是否删除成功
    if (isSuccess) {
      this.length--;
      return true;
    }

    return false;
  }

  isEmpty() {
    return !!this.length;
  }

  size() {
    return this.length;
  }

  toString() {
    let current = this.head;
    let str = "";
    while (current) {
      str += `,${current.data.toString()}`;
      current = current.next;
    }
    return str.slice(1);
  }
};

上一篇:Android 标签栏pagerslidingtabstrip用法实例(含Demo),android开发的基础知识


下一篇:Android-FragmentPagerAdapter刷新无效的解决方案,快来收藏