内容简介:在之前的博客中已经了解了流的基本用法(请看我之前的博客),这篇的重点在于根据可读流的用法对可读流的原理进行分析,实现简易版的在使用在类原型上的方法内部可能会创建一些事件,在 NodeJS 中,事件是依赖
前言
在之前的博客中已经了解了流的基本用法(请看我之前的博客),这篇的重点在于根据可读流的用法对可读流的原理进行分析,实现简易版的 ReadStream
可读流的实现(流动模式)
1、ReadStream 类创建
在使用 fs
的 createReadStream
创建可读流时,返回了 ReadStream
对象,上面存在着一些事件和方法,其实我们在创建这个可读流的时候创建了某一个类的实例,这个实例可以调用类原型上的方法,我们这里将这个类命名为 ReadStream
。
在类原型上的方法内部可能会创建一些事件,在 NodeJS 中,事件是依赖 events
模块的,即 EventEmitter
类,同时类的方法可能会操作文件,会用到 fs
模块,所以也提前引入 fs
。
创建 ReadStream 类
// 引入依赖模块 const EventEmitter = require("events"); const fs = require("fs"); // 创建 ReadStream 类 class ReadStream extends EventEmitter { constructor(path, options = {}) { super(); // 创建可读流参数传入的属性 this.path = path; // 读取文件的路径 this.flags = options.flags || "r"; // 文件标识位 this.encoding = options.encoding || null; // 字符编码 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 权限位 this.autoClose = options.autoClose || true; // 是否自动关闭 this.start = options.start || 0; // 读取文件的起始位置 this.end = options.end || null; // 读取文件的结束位置(包含) this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数 this.flowing = false; // 控制当前是否是流动状态,默认为暂停状态 this.buffer = Buffer.alloc(this.highWaterMark); // 存储读取内容的 Buffer this.pos = this.start; // 下次读取文件的位置(变化的) // 创建可读流要打开文件 this.open(); // 如果监听了 data 事件,切换为流动状态 this.on("newListener", type => { if (type === "data") { this.flowing = true; // 开始读取文件 this.read(); } }); } } // 导出模块 module.exports = ReadStream;复制代码
使用 fs.createReadStream
时传入了两个参数,读取文件的路径和一个 options
选项, options
上有八个参数,我们在创建 ReadStream
类的时候将这些参数初始化到了 this
上。
创建可读流的时候有两种状态,流动状态和暂停状态,默认创建可读流是暂停状态,只有在触发 data
事件时才会变为流动状态,所以在 this
上挂载了 flowing
存储当前的状态是否为流动状态,值默认为 false
。
注意:这里说的暂停状态不是暂停模式,暂停模式是 readable
, 是可读流的另一种模式,我们这节讨论的可读流为流动模式。
在读取文件时其实是操作 Buffer 进行读取的,需要有一个 Buffer 实例用来存储每次读取的数据,所以在 this
上挂载了一个新创建的 Buffer,长度等于 highWaterMark
。
当从 start
值的位置开始读取文件,下一次读取文件的位置会发生变化,所以在 this
上挂载了 pos
属性,用于存储下次读取文件的位置。
在创建 ReadStream
的实例(可读流)时,应该打开文件并进行其他操作,所以在 this
上挂载了 open
方法并执行。
创建实例的同时监听了 newListener
事件,回调在每次使用 on
监听事件时触发,回调内部逻辑是为了将默认的暂停状态切换为流动状态,因为在使用时,流动状态是通过监听 data
事件触发的,在 newListener
的回调中判断事件类型为 data
的时候将 flowing
标识的值更改为 true
,并调用读取文件的 read
方法。
在使用 ES6 的类编程时,原型上的方法都是写在 class
内部,我们下面为了把原型上的方法拆分出来成为单独的代码块,都使用 ReadStream.prototype.open = function...
直接给原型添加属性的方式,但这样的方式和直接写在 class
内有一点区别,就是 class
内部的书写的原型方法都是不可遍历的,添加属性的方式创建的方法都是可遍历的,但是这点区别对我们代码的执行没有任何影响。
2、打开文件方法 open 的实现
在使用可读流时,打开时默认是暂停状态,会触发 open
事件,如果打开文件出错会触发 error
事件。
open 方法
// 打开文件 ReadStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); // 如果文件打开了出错,并配置自动关闭,则关掉文件 if (this.autoClose) { // 关闭文件(触发 close 事件) this.destroy(); // 不再继续执行 return; } } // 存储文件描述符 this.fd = fd; // 成功打开文件后触发 open 事件 this.emit("open"); }); };复制代码
open
方法的逻辑就是在打开文件的时候,将文件描述符存储在实例上方便后面使用,并使用 EventEmitter
的原型方法 emit
触发 open
事件,如果出错就使用 emit
触发 error
事件,如果配置 autoClose
参数为 true
,就关闭文件并触发 close
。
我们将关闭文件的逻辑抽取出来封装在了 ReadStream
类的 destroy
方法中,下面来实现 destroy
。
3、关闭文件方法 destroy 的实现
文件出错分为两种,第一种文件打开出错,第二种是文件不存在出错(没打开),第二种系统是没有分配文件描述符的。
detroy 方法
// 关闭文件 ReadStream.prototype.detroy = function() { // 判断是否存在文件描述符 if (typeof this.fd === "number") { // 存在则关闭文件并触发 close 事件 fs.close(fd, () => { this.emit("close"); }); return; } // 不存在文件描述符直接触发 close 事件 this.emit("close"); };复制代码
如果是打开文件后出错需要关闭文件,并触发 close
事件,如果是没打开文件,则直接触发 close
事件,所以上面通过文件描述符来判断该如何处理。
4、读取文件方法 read 的实现
还记得在 ReadStream
类中,监听的 newListener
事件的回调中如果监听了 data
事件则会执行 read
读取文件,接下来就实现读取文件的核心逻辑。
read 方法
// 读取文件 ReadStream.prototype.read = function() { // 由于 open 异步执行,read 是在创建实例时同步执行 // read 执行可能早于 open,此时不存在文件描述符 if (typeof this.fd !== "number") { // 因为 open 用 emit 触发了 open 事件,所以在这是重新执行 read return this.once("open", () => this.read()); } // 如过设置了结束位置,读到结束为止就不能再读了 // 如果最后一次读取真实读取数应该小于 highWaterMark // 所以每次读取的字节数应该和 highWaterMark 取最小值 let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark; // 读取文件 fs.read( this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => { // 如果读到内容执行下面代码,读不到则触发 end 事件并关闭文件 if (bytesRead > 0) { // 维护下次读取文件位置 this.pos += bytesRead; // 保留有效的 Buffer let realBuf = this.buffer.slice(0, bytesRead); // 根据编码处理 data 回调返回的数据 realBuf = this.encoding ? realBuf.toString(this.encoding) : realBuf; // 触发 data 事件并传递数据 this.emit("data", realBuf); // 递归读取 if (this.flowing) { this.read(); } } else { this.isEnd = true; this.emit("end"); // 触发 end 事件 this.detroy(); // 关闭文件 } } ); };复制代码
创建 ReadStream
的实例时,执行的 open
方法内部是使用 fs.open
打开文件的,是异步操作,而读取文件方法 read
是在 newListener
回调中同步执行的,这样很可能触发 read
的时候文件还没有被打开(不存在文件描述符),所以在 read
方法中判断了文件描述符是否存在,并在不存在时候使用 once
添加了 open
事件,回调中重新执行了 read
。
由于在 open
方法中使用 emit
触发了 open
事件,所以 read
内用 once
添加的 open
事件的回调也会跟着执行一次,并在回调中重新调用了 read
方法,保证了 read
读取文件的逻辑在文件真正打开后才执行,为了文件打开前执行 read
而不执行读取文件的逻辑,用 once
添加 open
事件时别忘记 return
。
在使用 fs.read
读取文件的时候有一个参数为本次读取几个字符到 Buffer 中,如果在创建可读流的时候设置了读取文件的结束位置 end
参数,则读到 end
位置就不应该再继续读取了,所以在存在 end
参数的时候每次都计算一下读取个数和 highWaterMark
取最小值,保证读取内容小于 highWaterMark
的时候不会多读,因为读取时是包括 end
值作为 Buffer 的索引这一项的,所以计算时多减去的要 +1
加回来,再一次读取这个读取个数计算结果变成了 0
,也就结束了读取。
因为 end
参数的情况,所以在内部读取逻辑前判断了 bytesRead
(实际读取字节数)是否大于 0
,如果不满足条件则在实例添加是否读取结束标识 isEnd
(后面使用),触发 end
事件并关闭文件,如果满足条件,也是通过 bytesRead
对 Buffer 进行截取,保留了有用的 Buffer,并且通过 encoding
编码对 Buffer 进行处理后,触发 data
事件,并将处理后的数据传递给 data
事件的回调。
5、暂停、恢复读取 pause 和 resume
pause
的目的就是暂停读取,其实就是阻止 read
方法在读取时进行递归,所以只需要更改 flowing
的值即可。
pause 方法
// 暂停读取 ReadStream.prototype.pause = function() { this.flowing = false; };复制代码
resume
的目的是恢复读取,在更改 flowing
值得基础上重新执行 read
方法,由于在 pause
调用时 read
内部还是执行得读取文件得分支,文件并没有关闭,读取文件位置的参数也是通过实例上的当前的属性值进行计算的,所以重新执行 read
会继续上一次的位置读取。
resume 方法
// 恢复读取 ReadStream.prototype.resume = function() { this.flowing = true; if (!this.isEnd) this.read(); };复制代码
上面在重新执行 read
之前使用 isEnd
标识做了判断,防止在 setInterval
中调用 resume
在读取完成后不断的触发 end
和 close
事件。
验证可读流(流动模式)ReadStream
接下来我们使用自己实现的 ReadStream
类来创建可读流,并按照 fs.createReadStream
的用法进行使用并验证。
验证 ReadStream
// 文件 1.txt 内容为 0123456789 const fs = require("fs"); const ReadStream = require("./ReadStream"); // 创建可读流 let rs = new ReadStream("1.txt", { encoding: "utf8", start: 0, end: 5, highWaterMark: 2 }); rs.on("open", () => console.log("open")); rs.on("data", data => { console.log(data, new Date()); rs.pause(); }); rs.on("end", () => console.log("end")); rs.on("close", () => console.log("close")); rs.on("error", err => console.log(err)); setInterval(() => rs.resume(), 1000); // open // 01 2018-07-04T10:44:20.384Z // 23 2018-07-04T10:44:21.384Z // 45 2018-07-04T10:44:22.384Z // end // close复制代码
执行上面的代码正常的执行逻辑是先触发 open
事件,然后触发 data
事件,读取一次后暂停,每隔一秒恢复读取一次,再读取完成后触发 end
和 close
事件,通过运行代码结果和我们希望的一样。
可读流的实现(暂停模式)
1、在 fs 中的暂停模式的真正用法
在 fs
模块中用 createReadStream
创建的可读流中通过监听 readable
事件触发暂停模式(监听 data 事件触发流动模式),通过下面例子感受暂停模式与流动模式的不同,现在读取文件 1.txt
,内容为 0~9
十个数字。
暂停模式的用法
// 读取的 const fs = require("fs"); // 创建可读流 let rs = fs.createReadStream("1.txt", { encoding: "utf8", start: 0, hithWaterMark: 3 }); rs.on("readable", () => { // read 参数为本次读取的个数 let r = rs.read(3); // 打印读取的数据 console.log(r); // 打印容器剩余空间 console.log(rs._readableState.length); }); // 012 // 0 // 345 // 0 // 678 // 0 // null // 1 // 90 // 0复制代码
通俗的解释,暂停模式的 readable
事件默认会触发一次,监听 readable
事件后就像创建了一个 “容器”,容量为 hithWaterMark
,文件中的数据会自动把容器注满,调用可读流的 read
方法读取时,会从容器中取出数据,如果 read
方法读取的数据小于 hithWaterMark
,则直接暂停,不再继续读取,如果大于 hithWaterMark
,说明 “容器” 空了,则会触发 readable
事件,无论读取字节数与 hithWaterMark
关系如何,只要 “容器” 内容量剩余小于 hithWaterMark
就会进行 “续杯”,再次向 “容器” 中填入 hithWaterMark
个,所以有些时候真实的容量会大于 hithWaterMark
。
read
方法读取的内容会返回 null
是因为容器内真实的数据数小于了读取数,如果不是最后一次读取,会在多次读取后将值一并返回,如果是最后一次读取,会把剩余不足的数据返回。
1、 readable
事件的触发条件:“容器” 空了;
2、“续杯” 条件:读取后 “容器” 内剩余量小于 hithWaterMark
。
3、 read
返回 null
:“容器” 容器内可悲读取数据无法满足一次读取字节数。
2、ReadableStream 类的实现
同为可读流,暂停模式与流动模式相同,都依赖 fs
模块和 events
模块的 EventEmitter
类,参数依然为读取文件的路径和 options
。
创建 ReadableStream 类
// 引入依赖 const EventEmitter = require("events"); const fs = require("fs"); class ReadableStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; // 读取文件的路径 this.flags = options.flags || "r"; // 文件标识位 this.encoding = options.encoding || null; // 字符编码 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 权限位 this.autoClose = options.autoClose || true; // 是否自动关闭 this.start = options.start || 0; // 读取文件的起始位置 this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数 this.reading = false; // 如果正在读取,则不再读取 this.emitReadable = false; // 当缓存区的长度等于 0 的时候, 触发 readable this.arr = []; // 缓存区 this.len = 0; // 缓存区的长度 this.pos = this.start; // 下次读取文件的位置(变化的) // 创建可读流要打开文件 this.open(); this.on("newListener", type => { if (type === "readable") { this.read(); // 监听readable就开始读取 } }); } } // 导出模块 module.exports = ReadableStream;复制代码
在类的添加了 newListener
事件,在回调中判断是否监听了 readable
事件,如果监听了开始从 “容器” 中读取。
3、打开、关闭文件 open 和 detroy
打开和关闭文件的方法和流动模式的套路基本相似。
open 方法
// 打开文件 ReadableStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); return; } } this.fd = fd; this.emit("open"); }); };复制代码
detroy 方法
// 关闭文件 ReadableStream.prototype.detroy = function() { if (typeof this.fd === "number") { fs.close(fd, () => { this.emit("close"); }); return; } this.emit("close"); };复制代码
4、从 “容器” 中读取 read 方法的实现
read
方法的参数不传时就相当于从 “容器” 读取 highWaterMart
个字节,如果传参表示读取参数数量的字节数。
read 方法
ReadableStream.prototype.read = function(n) { // 如果读取大于了 highWaterMark,重新计算 highWaterMark,并重新读取 if (n > this.len) { // 计算新的 highWaterMark,方法摘自 NodeJS 源码 this.highWaterMark = computeNewHighWaterMark(n); this.reading = true; this._read(); } // 将要返回的数据 let buffer; // 如果读取的字节数大于 0 小于等于当前缓存 Buffer 的总长度 if (n > 0 && n <= this.len) { // 则从缓存中取出 buffer = Buffer.alloc(n); let current; // 存储每次从缓存区读出的第一个 Buffer let index = 0; // 每次读取缓存 Buffer 的索引 let flag = true; // 是否结束整个 while 循环的标识 // 开始读取 while ((current = this.arr.shift()) && flag) { for (let i = 0; i < current.length; i++) { // 将缓存中取到的 Buffer 的内容读到自己定义的 Buffer 中 buffer[index++] = current[i]; // 如果当前索引值已经等于了读取个数,结束 for 循环 if (index === n) { flag = false; // 取出当前 Buffer 没有消耗的 let residue = current.slice(i + 1); // 在读取后维护缓存的长度 this.len -= n; // 如果 BUffer 真的有剩下的就给塞回到缓存中 if (residue.length) { this.arr.unshift(residue); } break; } } } } // 如果当前 读取的 Buffer 为 0,将触发 readable 事件 if (this.len === 0) { this.emitReadable = true; } // 如果当前的缓存区大小小于 highWaterMark,就要读取 if (this.len < this.highWaterMark) { // 如果不是正在读取才开始读取 if (!this.read) { this.reading = true; this._read(); // 正真读取的方法 } } // 将 buffer 转回创建可读流设置成的编码格式 if (buffer) { buffer = this.encoding ? buffer.toString(this.encoding) : buffer; } return buffer; };复制代码
上面的 read
方法的参数大小对比缓存区中取出的 Buffer 长度有两种情况,一种是小于当前缓存区内取出 Buffer 的长度,一种是大于了真个缓存区的 len
的长度。
小于当前缓存区总长度通过循环取出需要的 Buffer 存储了我们要返回创建的 Buffer 中,剩余的 Buffer 会丢失,所以我们做了一个小小的处理,将剩下的 Buffer 作为第一个 Buffer 塞回到了缓存区中,在处理这个问题时与流动模式不相同,流动模式处理后直接跳出了,而暂停模式相当于从 “容器” 中读取,如果第一次读取后还有剩余还要接着从容器中继续读取。
大于 len
属性时,规定需要重新计算 highWaterMark
,遵循的原则是将当前 highWaterMark
设定为当前读取字节个数距离最接近的 2
的 n
次方的数值,NodeJS 源码中方法名称为 computeNewHighWaterMark
,为了提高性能是使用位运算的方式进行计算的,源码如下。
重新计算 highWaterMark
function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; }复制代码
在调用该方法重新计算 highWaterMark
后更改正在读取状态,重新读取,由于读取逻辑的重复,所以真正读取文件的逻辑抽取成一个 _read
方法来实现,下面呢就来看看 _read
内部都做了什么。
5、真正读取文件的 _read
对比可读流(流动模式)的 read
方法,在调用 _read
方法读取时,是在 newListener
中同步执行 _read
,所以为了保证 _read
的逻辑是在 open
方法打开文件以后执行,使用了与 read
相同的处理方式。
_read 方法
ReadableStream.prototype._read = function() { if (typeof this.fd !== "number") { return this.once("open", () => _read()); } // 创建本次读取的 Buffer let buffer = Buffer.alloc(this.highWaterMark); // 读取文件 fs.read( this.fd, buffer, 0, this.highWaterMark, this.pos, (err, bytesRead) => { if (bytesRead > 0) { this.arr.push(buffer); // 缓存 this.len += bytesRead; // 维护缓存区长度 this.pos += bytesRead; // 维护下一次读取位置 this.reading = false; // 读取完毕 // 触发 readable 事件 if (this.emitReadable) { // 触发后更改触发状态为 false this.emitReadable = false; this.emit("readable"); } } else { // 如果读完触发结束事件 this.emit("end"); } } ); };复制代码
由于缓存区是一个数组,存储的每一个 Buffer 是独立存在的,所以不能挂载在实例上共用,如果挂在实例上则引用相同,一动全动,这不是我们想要的,所以每一次执行 _read
方法时都创建新的 Buffer 实例存入读取的数据后存储在缓存区中,如果读取完成 bytesRead
为 0
,则触发 end
事件。
注意:在 NodeJS 源码中,可读流的两种模式代码都是混合在一起的,只是使用 fs.createReadStream
创建一个可读流,通过监听 data
和 readable
两种不同的事件来触发两种不同的模式,而我们为了模拟,把两种模式拆开成了两个类来实现的,在测试时需要创建不同类的实例。
验证可读流(暂停模式)ReadableStream
为了统一我们依然读取真正用法中 1.txt
文件,内容为 0~9 十个数字。
验证 ReadableStream
// 引入依赖 const fs = require("fs"); const ReadableStream = require("./ReadableStream"); let rs = new ReadableStream("1.txt", { encoding: "utf8", start: 0, highWaterMark: 3 }); rs.on("readable", () => { let r = rs.read(3); console.log(r); console.log(rs.len); });复制代码
在打印 “容器” 剩余容量时,我们使用在 ReadableStream
上构造的 len
属性。
流动模式和暂停模式分别有不同的应用场景,如果只是希望读取一个文件,并最快的获得结果使用流动模式是很好的选择,如果希望了解读取文件的具体内容,并进行精细的处理,使用暂停模式更好一些。
可写流的实现
1、WriteStream 类创建
在使用 fs
的 createWriteStream
创建可写流时,返回了 WriteStream
对象,上面也存在事件和方法,创建可写流的时也是创建类的实例,我们将这个类命名为 WriteStream
。事件同样依赖 events
模块的 EventEmitter
类,文件操作同样依赖 fs
模块,所以需提前引入。
创建 WriteStream 类
// 引入依赖模块 const EventEmitter = require("events"); const fs = require("fs"); // 创建 WriteStream 类 class WriteStream extends EventEmitter { constructor(path, options = {}) { super(); // 创建可写流参数传入的属性 this.path = path; // 写入文件的路径 this.flags = options.flags || "w"; // 文件标识位 this.encoding = options.encoding || "utf8"; // 字符编码 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 权限位 this.autoClose = options.autoClose || true; // 是否自动关闭 this.start = options.start || 0; // 写入文件的起始位置 this.highWaterMark = options.highWaterMark || 16 * 1024; // 对比写入字节数的标识 this.writing = false; // 是否正在写入 this.needDrain = false; // 是否需要触发 drain 事件 this.buffer = []; // 缓存,正在写入就存入缓存中 this.len = 0; // 当前缓存的个数 this.pos = this.start; // 下次写入文件的位置(变化的) // 创建可写流要打开文件 this.open(); } } // 导出模块 复制代码
module.exports = WriteStream;
使用 fs.createWriteStream
创建可写流时传入了两个参数,写入的文件路径和一个 options
选项, options
上有七个参数,我们在创建 ReadStream
类的时候将这些参数初始化到了 this
上。
创建可写流后需要使用 write
方法进行写入,写入时第一次会真的通过内存写入到文件中,而再次写入则会将内容写到缓存中,注意这里的 “内存” 和 “缓存”,内存是写入文件是的系统内存,缓存是我们自己创建的数组,第一次写入以后要写入文件的 Buffer 都会先存入这个数组中,这个数组名为 buffer
,挂载在实例上,实例上同时挂载了 len
属性用来存储当前缓存中 Buffer 总共的字节数(长度)。
我们在可读流上挂载了是否正在写入的状态 writing
属性,只要缓存区中存在未写入的 Buffer, writing
的状态就是正在写入,当写入的字节数大于了 highWaterMark
需要触发 drain
事件,所以又挂载了是否需要触发 drain
事件的标识 needDrain
属性。
当从文件的 start
值对应的位置开始写入,下一次写入文件的位置会发生变化,所以在 this
上挂载了 pos
属性,用于存储下次写入文件的位置。
在 NodeJS 流的源码中缓存是用链表实现的,通过指针来操作缓存中的 Buffer,而我们为了简化逻辑就使用数组来作为缓存,虽然性能相对链表要差。
2、打开、关闭文件 open 和 detroy
在 WriteStream
中,写入文件之前也应该打开文件,在打开文件过程中出错时也应该触发 error
事件并关闭文件,打开和关闭文件的方法 open
和 detroy
与 ReadStream
的 open
和 detroy
方法的逻辑如出一辙,所以这里直接拿过来用了。
open 方法
// 打开文件 WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); return; } } this.fd = fd; this.emit("open"); }); };复制代码
detroy 方法
// 关闭文件 WriteStream.prototype.detroy = function() { if (typeof this.fd === "number") { fs.close(fd, () => { this.emit("close"); }); return; } this.emit("close"); };复制代码
3、写入文件方法 write 的实现
write
方法默认支持传入三个参数:
- chunk:写入文件的内容;
- encoding:写入文件的编码格式;
- callback:写入成功后执行的回调。
write 方法
// 写入文件的方法,只要逻辑为写入前的处理 WriteStream.prototype.write = function( chunk, encoding = this.encoding, callback ) { // 为了方便操作将要写入的数据转换成 Buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 维护缓存的长度 this.len += chunk.lenth; // 维护是否触发 drain 事件的标识 this.needDrain = this.highWaterMark <= this.len; // 如果正在写入 if (this.writing) { this.buffer.push({ chunk, encoding, callback }); } else { // 更改标识为正在写入,再次写入的时候走缓存 this.writing = true; // 如果已经写入清空缓存区的内容 this._write(chunk, encoding, () => this.clearBuffer()); } return !this.needDrain; };复制代码
与可写流的 read
一样,我们在使用 write
方法将数据写入文件时,也是操作 Buffer,在 write
方法中,首先将接收到的要写入的数据转换成了 Buffer,因为是多次写入,要知道缓存中 Buffer 字节数的总长度,所以维护了 len
变量。
我们的 WriteStream
构造函数中, this
挂载了 needDrain
属性,在使用 fs.createWriteStream
创建的可读流时,是写入的字节长度超过 highWaterMark
才会触发 drain
事件,而 needDrain
与 write
的返回值正好相反,所以我们用 needDrain
取反来作为 write
方法的返回值。
在写入的逻辑中第一次是直接通过内存写入到文件,但是再次写入就需要将数据存入缓存,将数据写入到文件中写入状态 writing
默认为 false
,通过缓存再写入证明应该正在写入中,所以在第一次写入后应更改 writing
的状态为 true
,写入缓存其实就是把转换的 Buffer、编码以及写入成功后要执行的回调挂在一个对象上存入缓存的数组 buffer
中。
我们把真正写入文件的逻辑抽取成一个单独的方法 _write
,并传入 chunk
(要写入的内容,已经处理成 Buffer)、 encoding
(字符编码)、回调函数,在回调函数中执行了原型方法 clearBuffer
,接下来就来实现 _write
和 clearBuffer
。
注意:方法使用 `
` 开头代表私有方法,轻易不要在外部调用或修改,这是一个开发者之间约定俗成的不成文规定。_4、真正的文件操作 _write
对比可读流(流动模式)的 read
方法,在调用 _write
方法写入时,是在创建可写流之后的同步代码中执行的,与可读流在 newListener
中同步执行 read
的情况类似,所以为了保证 _write
的逻辑是在 open
方法打开文件以后执行,使用了与 read
相同的处理方式。
_write 方法
// 真正的写入文件操作的方法 WriteStream.prototype._write = function(chunk, encoding, callback) { // 由于 open 异步执行,write 是在创建实例时同步执行 // write 执行可能早于 open,此时不存在文件描述符 if (typeof this.fd !== "number") { // 因为 open 用 emit 触发了 open 事件,所以在这是重新执行 write return this.once("open", () => this._write(chunk, encoding, callback)); } // 读取文件 fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => { // 维护下次写入的位置和缓存区 Buffer 的总字节数 this.pos += bytesWritten; this.len -= bytesWritten; callback(); }); };复制代码
在打开文件并写入的时候需要维护两个变量,下次写入的位置 pos
和当前缓存区内 Buffer 所占总字节数 len
,本次写入了多少个字节,下次写入需要在写入位置的基础上加多少个字节,而 len
恰恰相反,本次写入了多少个字节,缓存区中的总长度应该对应的减少多少个字节。
在维护两个变量的值以后调用 callback
,其实 callback
内执行的是 clearBuffer
方法,就如方法名,译为 “清空缓存”,其实就是一次一次的将数据写入文件并从缓存中移除,很明显需要递归调用 _write
方法,我们将这个递归的逻辑统一放在 clearBuffer
方法中实现。
5、清空缓存操作 clearBuffer
clearBuffer 方法
// 清空缓存方法 WriteStream.prototype.clearBuffer = function() { // 先写入的在数组前面,从前面取出缓存中的 Buffer let buf = this.buffer.shift(); // 如果存在 buf,证明缓存还有 Buffer 需要写入 if (buf) { // 递归 _write 按照编码将数据写入文件 this._write(buf.chunk, buf.encoding, () => this.clearBuffer); } else { // 如果没有 buf,说明缓存内的内容已经完全写入文件并清空,需要触发 drain 事件 this.emit("drain"); // 更改正在写入状态 this.writing = false; // 更改是否需要触发 drain 事件状态 this.needDrain = false; } };复制代码
clearBuffer
方法中获取了缓存区数组的最前面的 Buffer(最前面的是先写入缓存的,也应该先取出来写入文件),存在这个 Buffer 时,递归 _write
方法按照编码将数据写入文件,如果不存在说明缓存区已经清空了,代表内容完全写入文件中,所以触发 drain
事件,最后更改了 writing
和 needDrain
的状态。
更正 writing
是为了 WriteStream
创建的可读流在下次调用 write
方法时默认第一次真正写入文件,而更正 needDrain
的状态是在缓存区要清空的最后一个 Buffer 的长度小于了 highWaterMark
时,保证 write
方法的返回值是正确的。
第一次是真正写入,其他的都写入缓存,再一个一个的将缓存中存储的 Buffer 写入并从缓存清空,之所以这样设计是为了把写入的内容排成一个队列,假如有 3
个人同时操作一个文件写入内容,只有第一个人是真的写入,其他的人都写在缓存中,再按照写入缓存的顺序依次写入文件,避免冲突和写入顺序出错。
验证可写流 WriteStream
接下来我们使用自己实现的 WriteStream 类来创建可写流,并按照 fs.createWriteStream
的用法进行使用并验证。
验证 WriteStream
// 向 1.txt 文件中写入 012345 const fs = require("fs"); const WriteStream = require("./WriteStream"); // 创建可写流 let ws = new WriteStream("2.txt", { highWaterMark: 3 }); let i = 0; function write() { let flag = true; while (i <= 6 && flag) { i++; flag = ws.write(i + "", "utf8"); } } ws.on("drain", function() { console.log("写入成功"); write(); }); write(); // true // true // false // 写入成功 // true // true // false // 写入成功复制代码
可以使用 fs.createWriteStream
和 WriteStream
类分别执行上面的代码,对比结果,看看是否相同。
可读流和可写流的桥梁 pipe
可写流和可读流一般是通过 pipe
配合来使用的, pipe
方法是可读流 ReadStream
的原型方法,参数为一个可写流。
pipe 方法
// 连接可读流和可写流的方法 pipe ReadStream.prototype.pipe = function(dest) { // 开始读取 this.on("data", data => { // 如果超出可写流的 highWaterMark,暂停读取 let flag = dest.write(data); if (!flag) this.pause(); }); dest.on("drain", () => { // 当可写流清空内存时恢复读取 this.resume(); }); this.on("end", () => { // 在读取完毕后关闭文件 this.destroy(); }); };复制代码
pipe
方法其实就是通过可读流的 data
事件触发流动状态,并用可写流接收读出的数据进行写入,当写入数据超出 highWaterMark
,则暂停可读流的读取,直到可写流的缓存被清空并把内容写进文件后,恢复可读流的读取,当读取结束后关闭文件。
下面我们实现一个将 1.txt
的内容拷贝 2.txt
中的例子。
验证 pipe
// pipe 的使用 const fs = require("fs"); // 引入自己的 ReadStream 类和 WriteStream 类 const ReadStream = rquire("./ReadStream"); const WriteStream = rquire("./WriteStream"); // 创建可读流和可写流 let rs = new ReadStream("1.txt", { highWaterMark: 3 }); let ws = new WriteStream("2.txt", { highWaterMark: 2 }); // 使用 pipe 实现文件内容复制 rs.pipe(ws);复制代码
总结
在 NodeJS 源码中,可读流和可写流的内容要比本篇内容多很多,本篇是将源码精简,抽出核心逻辑并针对流的使用方式进行实现,主要目的是帮助理解流的原理和使用,争取做到 “知其然知其所以然”,了解了一些底层再对流使用时,也能游刃有余。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。