可写流的使用
可写流的highWaterMark表示期望这个文件接受多少个值。
end不仅会写入,而且会触发close事件。
一个true,一个false,是因为我们的highWaterMark设置了3,希望只用3个内存来写,但是返回的值与我们是否写入无关,返回false也会写入。
-
但是有个问题,我们写多个wirte的时候,是并发异步操作,所以不能确定哪个快哪个慢。
-
可以将并发异步操作变为串行异步。
-
除了第一次的write,接下来的write排队,第一个完成后,将队列中的每一个write拿出来执行。有点像eventsloop,直到清空队列,但是队列缓存可能会过大,所以需要一个预期,也就是highWaterMark来控制,达到预期后,就不要调用write方法。虽然再调用也会写入进去。
-
结合fs.createStream。
当我们第一次写入的时候,当文件吃不下了,也就是到highWaterMark的值了,就应该停止写入了,等文件吃完了,触发了drain方法,这时候再去恢复rs.resume(),此时后面的写入就是往真实的文件中去写了,而不是一直在排队了。读取默认的highWaterMark是64k,而写入的默认highWaterMark是16k。 -
这跟第一种并发异步写入有很大区别。
可写流的实现
可写流是基于链表实现的,因为可写流涉及队列排序的问题,比较多的使用了头部的增删。而链表在头部和尾部的增删的时候效率比较高。
链表的实现可以观看:单链表,双链表
- 可读流内部是基于steam和stream的Readable(跟我们实现的MyReadSteream类相似,内部也是调用fs.read)以及events来实现的。fs自己实现了ReadStream继承stream的Readable,并且实现自己的_read方法。
- 可写流内部是基于stream和Writable以及events来实现的,fs实现的WrtieStream继承了stream的Writable。自己实现的_wirte方法供父类的wirte方法调用。
实现十个数,希望使用三个内存来处理
原生的。
实现自己的WriteStream
思路:跟可读流一样,将open和wirte操作分离,通过事件发布的模式。然后通过变量判断当前write是否是第一次调用,保证每次只有一个write在执行,其他的全部扔进缓存,当write执行完毕之后,再从缓存中一个一个拿出来执行write。直到缓存清空完毕,触发drain事件,通知用户缓存清完了,变量置为初始化。
这样就能使并发异步操作变成串行异步操作。
初始化变量,这里跟ReadStream有点区别,比如没有end,encoding默认写入是utf8等等。
len用来判断当前缓存的值的长度,needDrain用来判断缓存是否过多是否达到期望值。cahce是缓存队列。writing用来标识是否第一次写入。
- open方法
- 执行write方法
因为数据可能有中文,有英文,所以第一步默认转为buffer。然后判断当前write写入的buffer的长度+this.len(缓存队列的长度)与期望值highWaterMark相比较,是否超过期望值。
再重写cb函数,这样每次写入成功调用cb的时候,就会执行clearBuffer方法。
然后通过writing判断是不是第一次写入,第一次写入就调用_write方法,真正的执行fs.write,否则就放入缓存之中。 - _write方法
write方法可能用户执行的时候open还没打开,因为是异步,所以需要做处理,然后执行write方法,写入内容,每次写完之后都要维护偏移量,并且减少len。执行cb()。cb是重写过的,会调用clearBuffer方法。 - clearBuffer方法
这个方法主要将缓存中的write任务一个一个拿出并执行,注意这里调用的是_write方法,是要真正写入文件的。然后清空缓存之后,需要出发drain事件,告诉用户已经清空,当前可以继续write。并且重置一些变量。
触发四次drain事件,成功写入四个。
总结:
通过缓存以及全局变量,和事件系统,只有第一次write才会真正执行fs.write,其他的write方法会放入缓存,只有当fs.write执行完毕之后,才会继续从缓存取出进行fs.wrtie。直到缓存为空,再触发drain事件,重置变量,让用户可以继续调用write方法写入。就好比人吃饭,喂一口大的,它总要先吃一点小的,剩下部分在口腔里嚼,并且返回false告诉你,先别喂,等全部吃完。当它慢慢嚼完并吞下当前口腔有的事物之后,再告诉你,可以继续喂了,依次类推,反复循环,直到这碗饭吃饭(全部写入)
优化 使用链表代替数组cache
链表的实现:在单链表这里。
实验:
成功删除了第一个3,链表此时是6=>10。
改造完成,
期望是3,所以第一次会调用三次write,两个write存入queue中,如
首先queue中有两个,然后取出49之后,有剩一个50,再取出来之后就为空。完成。
全部代码:
// WriteStream
const LinkedList = require("./linklist");
//基于链表实现队列
class Queue {
constructor() {
this.link = new LinkedList();
}
//在最后一个中加入
offer(element) {
this.link.append(element);
}
//移除链表第一位并返回
shift() {
return this.link.removeAt(0);
}
}
class WriteStream extends EventMitter {
constructor(path, options) {
super();
this.path = path;
this.flags = options.flags || "w";
this.encoding = options.encoding || "utf8";
this.autoClose = options.autoClose || true;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.emitClose = options.emitClose || true;
this.offset = this.start; // 每次写入文件的位移数
this.fd = undefined;
this.len = 0; //判断的缓存
this.needDrain = false; //是否需要触发drain
this.cache = []; //第二次开始的写入缓存
this.writing = false; //标识是否正在写入
this.queue = new Queue();
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
this.fd = fd;
this.emit("open", fd);
});
}
_write(chunk, encoding, cb) {
if (typeof this.fd !== "number") {
this.once("open", () => {
this._write(chunk, encoding, cb);
});
return;
}
fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => {
this.offset += written; //维护偏移量
this.len -= written; //把缓存的个数减少
cb(); //回调
});
}
//依次将缓存队列等待任务拿出来一个个写入。
clearBuffer() {
const a = JSON.stringify(this.queue);
console.log(a);
const data = this.queue.shift();
if (data) {
this._write(data.chunk, data.encoding, data.cb);
} else {
//缓存读取完了,需要触发drain事件
this.writing = false; //当前写入已经完成了。
if (this.needDrain) {
//如果存入过多导致期望值过大
this.needDrain = false;
this.emit("drain");
}
}
}
}
//模拟Stream的Writable的实例上的write方法
EventMitter.prototype.write = function (
chunk,
encoding = this.encoding,
cb = () => {}
) {
// 异步方法
// 注意写入的中文跟英文比较。 将数据全部转换为Buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.len += chunk.length;
//判断到达期望了吗
const returnValue = this.len < this.highWaterMark ? true : false;
//当数据写入后,需要触发drain并且将Len减减
this.needDrain = !returnValue;
// 清空缓存队列的逻辑
const userCb = cb;
cb = () => {
userCb();
this.clearBuffer();
};
//判断是否第一次给入的数据,因为后面的write都是直接放入缓存中
if (!this.writing) {
//第一次写入,真正执行写入操作
this._write(chunk, encoding, cb);
this.writing = true;
} else {
//第二次写入了,存入缓存期
this.queue.offer({
chunk,
encoding,
cb,
});
}
return returnValue;
};
const ws = new WriteStream("b.txt", {
flags: "w",
encoding: null,
autoClose: true,
start: 0, //没有end属性,只有start
highWaterMark: 3, //与可读流不一样,可写流的highWaterMark表示期望这个文件只接受3个内存
});
ws.on("open", () => {
console.log("文件打开");
});
let i = 0;
function write() {
let flag = true;
while (i < 4 && flag) {
flag = ws.write(i++ + "");
console.log('flag', flag);
}
}
ws.on("drain", () => {
//只有当吸入的数据达到了预期,并且数据已经被写入文件之后才会触发drain事件。
console.log("写完了");
write();
});
write();
// 链表
class Node {
constructor(data) {
this.data = data;
this.next = null;
}
}
module.exports = class LinkedList {
constructor() {
//头指针
this.head = null;
//链表的长度
this.length = 0;
}
append(data) {
const element = new Node(data);
if (!this.head) {
this.head = element;
} else {
let current = this.head;
//遍历找到最后的节点
while (current.next) {
current = current.next;
}
//插入
current.next = element;
}
this.length++;
}
// //特定位置插入
insert(position, data) {
if (
typeof position !== "number" ||
position < 0 ||
position > this.length
) {
return false;
}
const element = new Node(data);
//插入首位
if (position === 0) {
element.next = this.head;
this.head = element;
} else {
//插入中间的一位
let current = this.head;
let index = 1;
//采用插入位置前一位进行操作,比如插入到第四个,就将element.next指向第三个的下一个,再将第三个的下一个重新向element
while (index++ < position) {
// 如position = 4, index = 4的时候, current指向第三个,因为执行了两遍
current = current.next;
}
// 让element变成第四个
element.next = current.next;
current.next = element;
// for (let i = 1; i < position; i++) {
// if (i === position - 1) {
// element.next = current.next
// current.next = element
// break;
// }
// current = current.next
// }
}
this.length++;
}
// //获取对应位置的元素
get(position) {
if (
typeof position !== "number" ||
position < 0 ||
position >= this.length
) {
return undefined;
}
let current = this.head;
let index = 0;
while (index++ < position) {
current = current.next;
}
return current.data;
}
// //返回元素在列表中的索引
indexOf(data) {
let current = this.head;
let index = 0;
while (current) {
if (current.data === data) {
return index;
}
current = current.next;
index++;
}
return -1;
}
// //修改某个位置的元素
update(position, data) {
if (
typeof position !== "number" ||
position < 0 ||
position >= this.length
) {
return false;
}
let current = this.head;
let index = 0;
while (index++ < position) {
current = current.next;
}
current.data = data;
}
// //从列表的特定位置移除一项
removeAt(position) {
if (
typeof position !== "number" ||
position < 0 ||
position >= this.length
) {
return undefined;
}
let current = this.head;
if (position === 0) {
const headElData = this.head.data;
this.head = this.head.next;
this.length--;
return headElData;
}
let index = 0;
// 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
while (index++ < position - 1) {
current = current.next;
}
const currentElement = current.next;
let element = current.next.next; //保存第三个节点
current.next.next = null; //让第二个节点与第三个节点断掉关系
current.next = element; // 第一个节点指向第三个节点
this.length--;
return currentElement.data;
}
// 从列表移除一项
remove(element) {
let current = this.head;
let nextElement;
let isSuccess;
//如果第一个就是
if (current.data === element) {
nextElement = this.head.next;
this.head = element;
element.next = nextElement;
return true;
}
// 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。
while (current) {
if (current.next.data === element) {
nextElement = current.next.next; //存粗下下个节点
current.next.next = null; //断开下个节点与下下个节点的联系
current.next = nextElement; //连接下下个节点
isSuccess = true;
break;
} else {
current = current.next;
}
}
//是否删除成功
if (isSuccess) {
this.length--;
return true;
}
return false;
}
isEmpty() {
return !!this.length;
}
size() {
return this.length;
}
toString() {
let current = this.head;
let str = "";
while (current) {
str += `,${current.data.toString()}`;
current = current.next;
}
return str.slice(1);
}
};