NodeJS stream 流 原理分析(附源码)

栏目: Node.js · 发布时间: 6年前

内容简介:在之前的博客中已经了解了流的基本用法(请看我之前的博客),这篇的重点在于根据可读流的用法对可读流的原理进行分析,实现简易版的在使用在类原型上的方法内部可能会创建一些事件,在 NodeJS 中,事件是依赖

前言

在之前的博客中已经了解了流的基本用法(请看我之前的博客),这篇的重点在于根据可读流的用法对可读流的原理进行分析,实现简易版的 ReadStream

可读流的实现(流动模式)

1、ReadStream 类创建

在使用 fscreateReadStream 创建可读流时,返回了 ReadStream 对象,上面存在着一些事件和方法,其实我们在创建这个可读流的时候创建了某一个类的实例,这个实例可以调用类原型上的方法,我们这里将这个类命名为 ReadStream

在类原型上的方法内部可能会创建一些事件,在 NodeJS 中,事件是依赖 events 模块的,即 EventEmitter 类,同时类的方法可能会操作文件,会用到 fs 模块,所以也提前引入 fs

创建 ReadStream 类

// 引入依赖模块
const EventEmitter = require("events");
const fs = require("fs");

// 创建 ReadStream 类
class ReadStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        // 创建可读流参数传入的属性
        this.path = path; // 读取文件的路径
        this.flags = options.flags || "r"; // 文件标识位
        this.encoding = options.encoding || null; // 字符编码
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 权限位
        this.autoClose = options.autoClose || true; // 是否自动关闭
        this.start = options.start || 0; // 读取文件的起始位置
        this.end = options.end || null; // 读取文件的结束位置(包含)
        this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数

        this.flowing = false; // 控制当前是否是流动状态,默认为暂停状态
        this.buffer = Buffer.alloc(this.highWaterMark); // 存储读取内容的 Buffer
        this.pos = this.start; // 下次读取文件的位置(变化的)

        // 创建可读流要打开文件
        this.open();

        // 如果监听了 data 事件,切换为流动状态
        this.on("newListener", type => {
            if (type === "data") {
                this.flowing = true;

                // 开始读取文件
                this.read();
            }
        });
    }
}

// 导出模块
module.exports = ReadStream;复制代码

使用 fs.createReadStream 时传入了两个参数,读取文件的路径和一个 options 选项, options 上有八个参数,我们在创建 ReadStream 类的时候将这些参数初始化到了 this 上。

创建可读流的时候有两种状态,流动状态和暂停状态,默认创建可读流是暂停状态,只有在触发 data 事件时才会变为流动状态,所以在 this 上挂载了 flowing 存储当前的状态是否为流动状态,值默认为 false

注意:这里说的暂停状态不是暂停模式,暂停模式是 readable , 是可读流的另一种模式,我们这节讨论的可读流为流动模式。

在读取文件时其实是操作 Buffer 进行读取的,需要有一个 Buffer 实例用来存储每次读取的数据,所以在 this 上挂载了一个新创建的 Buffer,长度等于 highWaterMark

当从 start 值的位置开始读取文件,下一次读取文件的位置会发生变化,所以在 this 上挂载了 pos 属性,用于存储下次读取文件的位置。

在创建 ReadStream 的实例(可读流)时,应该打开文件并进行其他操作,所以在 this 上挂载了 open 方法并执行。

创建实例的同时监听了 newListener 事件,回调在每次使用 on 监听事件时触发,回调内部逻辑是为了将默认的暂停状态切换为流动状态,因为在使用时,流动状态是通过监听 data 事件触发的,在 newListener 的回调中判断事件类型为 data 的时候将 flowing 标识的值更改为 true ,并调用读取文件的 read 方法。

在使用 ES6 的类编程时,原型上的方法都是写在 class 内部,我们下面为了把原型上的方法拆分出来成为单独的代码块,都使用 ReadStream.prototype.open = function... 直接给原型添加属性的方式,但这样的方式和直接写在 class 内有一点区别,就是 class 内部的书写的原型方法都是不可遍历的,添加属性的方式创建的方法都是可遍历的,但是这点区别对我们代码的执行没有任何影响。

2、打开文件方法 open 的实现

在使用可读流时,打开时默认是暂停状态,会触发 open 事件,如果打开文件出错会触发 error 事件。

open 方法

// 打开文件
ReadStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);

            // 如果文件打开了出错,并配置自动关闭,则关掉文件
            if (this.autoClose) {
                // 关闭文件(触发 close 事件)
                this.destroy();

                // 不再继续执行
                return;
            }
        }
        // 存储文件描述符
        this.fd = fd;

        // 成功打开文件后触发 open 事件
        this.emit("open");
    });
};复制代码

open 方法的逻辑就是在打开文件的时候,将文件描述符存储在实例上方便后面使用,并使用 EventEmitter 的原型方法 emit 触发 open 事件,如果出错就使用 emit 触发 error 事件,如果配置 autoClose 参数为 true ,就关闭文件并触发 close

我们将关闭文件的逻辑抽取出来封装在了 ReadStream 类的 destroy 方法中,下面来实现 destroy

3、关闭文件方法 destroy 的实现

文件出错分为两种,第一种文件打开出错,第二种是文件不存在出错(没打开),第二种系统是没有分配文件描述符的。

detroy 方法

// 关闭文件
ReadStream.prototype.detroy = function() {
    // 判断是否存在文件描述符
    if (typeof this.fd === "number") {
        // 存在则关闭文件并触发 close 事件
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }

    // 不存在文件描述符直接触发 close 事件
    this.emit("close");
};复制代码

如果是打开文件后出错需要关闭文件,并触发 close 事件,如果是没打开文件,则直接触发 close 事件,所以上面通过文件描述符来判断该如何处理。

4、读取文件方法 read 的实现

还记得在 ReadStream 类中,监听的 newListener 事件的回调中如果监听了 data 事件则会执行 read 读取文件,接下来就实现读取文件的核心逻辑。

read 方法

// 读取文件
ReadStream.prototype.read = function() {
    // 由于 open 异步执行,read 是在创建实例时同步执行
    // read 执行可能早于 open,此时不存在文件描述符
    if (typeof this.fd !== "number") {
        // 因为 open 用 emit 触发了 open 事件,所以在这是重新执行 read
        return this.once("open", () => this.read());
    }

    // 如过设置了结束位置,读到结束为止就不能再读了
    // 如果最后一次读取真实读取数应该小于 highWaterMark
    // 所以每次读取的字节数应该和 highWaterMark 取最小值
    let howMuchToRead = this.end
        ? Math.min(this.highWaterMark, this.end - this.pos + 1)
        : this.highWaterMark;

    // 读取文件
    fs.read(
        this.fd,
        this.buffer,
        0,
        howMuchToRead,
        this.pos,
        (err, bytesRead) => {
            // 如果读到内容执行下面代码,读不到则触发 end 事件并关闭文件
            if (bytesRead > 0) {
                // 维护下次读取文件位置
                this.pos += bytesRead;

                // 保留有效的 Buffer
                let realBuf = this.buffer.slice(0, bytesRead);

                // 根据编码处理 data 回调返回的数据
                realBuf = this.encoding
                    ? realBuf.toString(this.encoding)
                    : realBuf;

                // 触发 data 事件并传递数据
                this.emit("data", realBuf);

                // 递归读取
                if (this.flowing) {
                    this.read();
                }
            } else {
                this.isEnd = true;
                this.emit("end"); // 触发 end 事件
                this.detroy(); // 关闭文件
            }
        }
    );
};复制代码

创建 ReadStream 的实例时,执行的 open 方法内部是使用 fs.open 打开文件的,是异步操作,而读取文件方法 read 是在 newListener 回调中同步执行的,这样很可能触发 read 的时候文件还没有被打开(不存在文件描述符),所以在 read 方法中判断了文件描述符是否存在,并在不存在时候使用 once 添加了 open 事件,回调中重新执行了 read

由于在 open 方法中使用 emit 触发了 open 事件,所以 read 内用 once 添加的 open 事件的回调也会跟着执行一次,并在回调中重新调用了 read 方法,保证了 read 读取文件的逻辑在文件真正打开后才执行,为了文件打开前执行 read 而不执行读取文件的逻辑,用 once 添加 open 事件时别忘记 return

在使用 fs.read 读取文件的时候有一个参数为本次读取几个字符到 Buffer 中,如果在创建可读流的时候设置了读取文件的结束位置 end 参数,则读到 end 位置就不应该再继续读取了,所以在存在 end 参数的时候每次都计算一下读取个数和 highWaterMark 取最小值,保证读取内容小于 highWaterMark 的时候不会多读,因为读取时是包括 end 值作为 Buffer 的索引这一项的,所以计算时多减去的要 +1 加回来,再一次读取这个读取个数计算结果变成了 0 ,也就结束了读取。

因为 end 参数的情况,所以在内部读取逻辑前判断了 bytesRead (实际读取字节数)是否大于 0 ,如果不满足条件则在实例添加是否读取结束标识 isEnd (后面使用),触发 end 事件并关闭文件,如果满足条件,也是通过 bytesRead 对 Buffer 进行截取,保留了有用的 Buffer,并且通过 encoding 编码对 Buffer 进行处理后,触发 data 事件,并将处理后的数据传递给 data 事件的回调。

5、暂停、恢复读取 pause 和 resume

pause 的目的就是暂停读取,其实就是阻止 read 方法在读取时进行递归,所以只需要更改 flowing 的值即可。

pause 方法

// 暂停读取
ReadStream.prototype.pause = function() {
    this.flowing = false;
};复制代码

resume 的目的是恢复读取,在更改 flowing 值得基础上重新执行 read 方法,由于在 pause 调用时 read 内部还是执行得读取文件得分支,文件并没有关闭,读取文件位置的参数也是通过实例上的当前的属性值进行计算的,所以重新执行 read 会继续上一次的位置读取。

resume 方法

// 恢复读取
ReadStream.prototype.resume = function() {
    this.flowing = true;
    if (!this.isEnd) this.read();
};复制代码

上面在重新执行 read 之前使用 isEnd 标识做了判断,防止在 setInterval 中调用 resume 在读取完成后不断的触发 endclose 事件。

验证可读流(流动模式)ReadStream

接下来我们使用自己实现的 ReadStream 类来创建可读流,并按照 fs.createReadStream 的用法进行使用并验证。

验证 ReadStream

// 文件 1.txt 内容为 0123456789
const fs = require("fs");
const ReadStream = require("./ReadStream");

// 创建可读流
let rs = new ReadStream("1.txt", {
    encoding: "utf8",
    start: 0,
    end: 5,
    highWaterMark: 2
});

rs.on("open", () => console.log("open"));

rs.on("data", data => {
    console.log(data, new Date());
    rs.pause();
});

rs.on("end", () => console.log("end"));
rs.on("close", () => console.log("close"));
rs.on("error", err => console.log(err));

setInterval(() => rs.resume(), 1000);

// open
// 01 2018-07-04T10:44:20.384Z
// 23 2018-07-04T10:44:21.384Z
// 45 2018-07-04T10:44:22.384Z
// end
// close复制代码

执行上面的代码正常的执行逻辑是先触发 open 事件,然后触发 data 事件,读取一次后暂停,每隔一秒恢复读取一次,再读取完成后触发 endclose 事件,通过运行代码结果和我们希望的一样。

可读流的实现(暂停模式)

1、在 fs 中的暂停模式的真正用法

fs 模块中用 createReadStream 创建的可读流中通过监听 readable 事件触发暂停模式(监听 data 事件触发流动模式),通过下面例子感受暂停模式与流动模式的不同,现在读取文件 1.txt ,内容为 0~9 十个数字。

暂停模式的用法

// 读取的
const fs = require("fs");

// 创建可读流
let rs = fs.createReadStream("1.txt", {
    encoding: "utf8",
    start: 0,
    hithWaterMark: 3
});

rs.on("readable", () => {
    // read 参数为本次读取的个数
    let r = rs.read(3);
    // 打印读取的数据
    console.log(r);
    // 打印容器剩余空间
    console.log(rs._readableState.length);
});

// 012
// 0
// 345
// 0
// 678
// 0
// null
// 1
// 90
// 0复制代码

通俗的解释,暂停模式的 readable 事件默认会触发一次,监听 readable 事件后就像创建了一个 “容器”,容量为 hithWaterMark ,文件中的数据会自动把容器注满,调用可读流的 read 方法读取时,会从容器中取出数据,如果 read 方法读取的数据小于 hithWaterMark ,则直接暂停,不再继续读取,如果大于 hithWaterMark ,说明 “容器” 空了,则会触发 readable 事件,无论读取字节数与 hithWaterMark 关系如何,只要 “容器” 内容量剩余小于 hithWaterMark 就会进行 “续杯”,再次向 “容器” 中填入 hithWaterMark 个,所以有些时候真实的容量会大于 hithWaterMark

read 方法读取的内容会返回 null 是因为容器内真实的数据数小于了读取数,如果不是最后一次读取,会在多次读取后将值一并返回,如果是最后一次读取,会把剩余不足的数据返回。

1、 readable 事件的触发条件:“容器” 空了;

2、“续杯” 条件:读取后 “容器” 内剩余量小于 hithWaterMark

3、 read 返回 null :“容器” 容器内可悲读取数据无法满足一次读取字节数。

2、ReadableStream 类的实现

同为可读流,暂停模式与流动模式相同,都依赖 fs 模块和 events 模块的 EventEmitter 类,参数依然为读取文件的路径和 options

创建 ReadableStream 类

// 引入依赖
const EventEmitter = require("events");
const fs = require("fs");

class ReadableStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        this.path = path; // 读取文件的路径
        this.flags = options.flags || "r"; // 文件标识位
        this.encoding = options.encoding || null; // 字符编码
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 权限位
        this.autoClose = options.autoClose || true; // 是否自动关闭
        this.start = options.start || 0; // 读取文件的起始位置
        this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次读取文件的字节数

        this.reading = false; // 如果正在读取,则不再读取
        this.emitReadable = false; // 当缓存区的长度等于 0 的时候, 触发 readable
        this.arr = []; // 缓存区
        this.len = 0; // 缓存区的长度
        this.pos = this.start; // 下次读取文件的位置(变化的)

        // 创建可读流要打开文件
        this.open();

        this.on("newListener", type => {
            if (type === "readable") {
                this.read(); // 监听readable就开始读取
            }
        });
    }
}

// 导出模块
module.exports = ReadableStream;复制代码

在类的添加了 newListener 事件,在回调中判断是否监听了 readable 事件,如果监听了开始从 “容器” 中读取。

3、打开、关闭文件 open 和 detroy

打开和关闭文件的方法和流动模式的套路基本相似。

open 方法

// 打开文件
ReadableStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);
            if (this.autoClose) {
                this.destroy();
                return;
            }
        }
        this.fd = fd;
        this.emit("open");
    });
};复制代码

detroy 方法

// 关闭文件
ReadableStream.prototype.detroy = function() {
    if (typeof this.fd === "number") {
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }
    this.emit("close");
};复制代码

4、从 “容器” 中读取 read 方法的实现

read 方法的参数不传时就相当于从 “容器” 读取 highWaterMart 个字节,如果传参表示读取参数数量的字节数。

read 方法

ReadableStream.prototype.read = function(n) {
    // 如果读取大于了 highWaterMark,重新计算 highWaterMark,并重新读取
    if (n > this.len) {
        // 计算新的 highWaterMark,方法摘自 NodeJS 源码
        this.highWaterMark = computeNewHighWaterMark(n);
        this.reading = true;
        this._read();
    }

    // 将要返回的数据
    let buffer;

    // 如果读取的字节数大于 0 小于等于当前缓存 Buffer 的总长度
    if (n > 0 && n <= this.len) {
        // 则从缓存中取出
        buffer = Buffer.alloc(n);

        let current; // 存储每次从缓存区读出的第一个 Buffer
        let index = 0; // 每次读取缓存 Buffer 的索引
        let flag = true; // 是否结束整个 while 循环的标识

        // 开始读取
        while ((current = this.arr.shift()) && flag) {
            for (let i = 0; i < current.length; i++) {
                // 将缓存中取到的 Buffer 的内容读到自己定义的 Buffer 中
                buffer[index++] = current[i];

                // 如果当前索引值已经等于了读取个数,结束 for 循环
                if (index === n) {
                    flag = false;

                    // 取出当前 Buffer 没有消耗的
                    let residue = current.slice(i + 1);

                    // 在读取后维护缓存的长度
                    this.len -= n;

                    // 如果 BUffer 真的有剩下的就给塞回到缓存中
                    if (residue.length) {
                        this.arr.unshift(residue);
                    }

                    break;
                }
            }
        }
    }

    // 如果当前 读取的 Buffer 为 0,将触发 readable 事件
    if (this.len === 0) {
        this.emitReadable = true;
    }

    // 如果当前的缓存区大小小于 highWaterMark,就要读取
    if (this.len < this.highWaterMark) {
        // 如果不是正在读取才开始读取
        if (!this.read) {
            this.reading = true;
            this._read(); // 正真读取的方法
        }
    }

    // 将 buffer 转回创建可读流设置成的编码格式
    if (buffer) {
        buffer = this.encoding ? buffer.toString(this.encoding) : buffer;
    }

    return buffer;
};复制代码

上面的 read 方法的参数大小对比缓存区中取出的 Buffer 长度有两种情况,一种是小于当前缓存区内取出 Buffer 的长度,一种是大于了真个缓存区的 len 的长度。

小于当前缓存区总长度通过循环取出需要的 Buffer 存储了我们要返回创建的 Buffer 中,剩余的 Buffer 会丢失,所以我们做了一个小小的处理,将剩下的 Buffer 作为第一个 Buffer 塞回到了缓存区中,在处理这个问题时与流动模式不相同,流动模式处理后直接跳出了,而暂停模式相当于从 “容器” 中读取,如果第一次读取后还有剩余还要接着从容器中继续读取。

大于 len 属性时,规定需要重新计算 highWaterMark ,遵循的原则是将当前 highWaterMark 设定为当前读取字节个数距离最接近的 2n 次方的数值,NodeJS 源码中方法名称为 computeNewHighWaterMark ,为了提高性能是使用位运算的方式进行计算的,源码如下。

重新计算 highWaterMark

function computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
}复制代码

在调用该方法重新计算 highWaterMark 后更改正在读取状态,重新读取,由于读取逻辑的重复,所以真正读取文件的逻辑抽取成一个 _read 方法来实现,下面呢就来看看 _read 内部都做了什么。

5、真正读取文件的 _read

对比可读流(流动模式)的 read 方法,在调用 _read 方法读取时,是在 newListener 中同步执行 _read ,所以为了保证 _read 的逻辑是在 open 方法打开文件以后执行,使用了与 read 相同的处理方式。

_read 方法

ReadableStream.prototype._read = function() {
    if (typeof this.fd !== "number") {
        return this.once("open", () => _read());
    }

    // 创建本次读取的 Buffer
    let buffer = Buffer.alloc(this.highWaterMark);

    // 读取文件
    fs.read(
        this.fd,
        buffer,
        0,
        this.highWaterMark,
        this.pos,
        (err, bytesRead) => {
            if (bytesRead > 0) {
                this.arr.push(buffer); // 缓存
                this.len += bytesRead; // 维护缓存区长度
                this.pos += bytesRead; // 维护下一次读取位置
                this.reading = false; // 读取完毕

                // 触发 readable 事件
                if (this.emitReadable) {
                    // 触发后更改触发状态为 false
                    this.emitReadable = false;
                    this.emit("readable");
                }
            } else {
                // 如果读完触发结束事件
                this.emit("end");
            }
        }
    );
};复制代码

由于缓存区是一个数组,存储的每一个 Buffer 是独立存在的,所以不能挂载在实例上共用,如果挂在实例上则引用相同,一动全动,这不是我们想要的,所以每一次执行 _read 方法时都创建新的 Buffer 实例存入读取的数据后存储在缓存区中,如果读取完成 bytesRead0 ,则触发 end 事件。

注意:在 NodeJS 源码中,可读流的两种模式代码都是混合在一起的,只是使用 fs.createReadStream 创建一个可读流,通过监听 datareadable 两种不同的事件来触发两种不同的模式,而我们为了模拟,把两种模式拆开成了两个类来实现的,在测试时需要创建不同类的实例。

验证可读流(暂停模式)ReadableStream

为了统一我们依然读取真正用法中 1.txt 文件,内容为 0~9 十个数字。

验证 ReadableStream

// 引入依赖
const fs = require("fs");
const ReadableStream = require("./ReadableStream");

let rs = new ReadableStream("1.txt", {
    encoding: "utf8",
    start: 0,
    highWaterMark: 3
});

rs.on("readable", () => {
    let r = rs.read(3);
    console.log(r);
    console.log(rs.len);
});复制代码

在打印 “容器” 剩余容量时,我们使用在 ReadableStream 上构造的 len 属性。

流动模式和暂停模式分别有不同的应用场景,如果只是希望读取一个文件,并最快的获得结果使用流动模式是很好的选择,如果希望了解读取文件的具体内容,并进行精细的处理,使用暂停模式更好一些。

可写流的实现

1、WriteStream 类创建

在使用 fscreateWriteStream 创建可写流时,返回了 WriteStream 对象,上面也存在事件和方法,创建可写流的时也是创建类的实例,我们将这个类命名为 WriteStream 。事件同样依赖 events 模块的 EventEmitter 类,文件操作同样依赖 fs 模块,所以需提前引入。

创建 WriteStream 类

// 引入依赖模块
const EventEmitter = require("events");
const fs = require("fs");

// 创建 WriteStream 类
class WriteStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        // 创建可写流参数传入的属性
        this.path = path; // 写入文件的路径
        this.flags = options.flags || "w"; // 文件标识位
        this.encoding = options.encoding || "utf8"; // 字符编码
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 权限位
        this.autoClose = options.autoClose || true; // 是否自动关闭
        this.start = options.start || 0; // 写入文件的起始位置
        this.highWaterMark = options.highWaterMark || 16 * 1024; // 对比写入字节数的标识

        this.writing = false; // 是否正在写入
        this.needDrain = false; // 是否需要触发 drain 事件
        this.buffer = []; // 缓存,正在写入就存入缓存中
        this.len = 0; // 当前缓存的个数
        this.pos = this.start; // 下次写入文件的位置(变化的)

        // 创建可写流要打开文件
        this.open();
    }
}

// 导出模块 复制代码

module.exports = WriteStream; 使用 fs.createWriteStream 创建可写流时传入了两个参数,写入的文件路径和一个 options 选项, options 上有七个参数,我们在创建 ReadStream 类的时候将这些参数初始化到了 this 上。

创建可写流后需要使用 write 方法进行写入,写入时第一次会真的通过内存写入到文件中,而再次写入则会将内容写到缓存中,注意这里的 “内存” 和 “缓存”,内存是写入文件是的系统内存,缓存是我们自己创建的数组,第一次写入以后要写入文件的 Buffer 都会先存入这个数组中,这个数组名为 buffer ,挂载在实例上,实例上同时挂载了 len 属性用来存储当前缓存中 Buffer 总共的字节数(长度)。

我们在可读流上挂载了是否正在写入的状态 writing 属性,只要缓存区中存在未写入的 Buffer, writing 的状态就是正在写入,当写入的字节数大于了 highWaterMark 需要触发 drain 事件,所以又挂载了是否需要触发 drain 事件的标识 needDrain 属性。

当从文件的 start 值对应的位置开始写入,下一次写入文件的位置会发生变化,所以在 this 上挂载了 pos 属性,用于存储下次写入文件的位置。

在 NodeJS 流的源码中缓存是用链表实现的,通过指针来操作缓存中的 Buffer,而我们为了简化逻辑就使用数组来作为缓存,虽然性能相对链表要差。

2、打开、关闭文件 open 和 detroy

WriteStream 中,写入文件之前也应该打开文件,在打开文件过程中出错时也应该触发 error 事件并关闭文件,打开和关闭文件的方法 opendetroyReadStreamopendetroy 方法的逻辑如出一辙,所以这里直接拿过来用了。

open 方法

// 打开文件
WriteStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);
            if (this.autoClose) {
                this.destroy();
                return;
            }
        }
        this.fd = fd;
        this.emit("open");
    });
};复制代码

detroy 方法

// 关闭文件
WriteStream.prototype.detroy = function() {
    if (typeof this.fd === "number") {
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }
    this.emit("close");
};复制代码

3、写入文件方法 write 的实现

write 方法默认支持传入三个参数:

  • chunk:写入文件的内容;
  • encoding:写入文件的编码格式;
  • callback:写入成功后执行的回调。

write 方法

// 写入文件的方法,只要逻辑为写入前的处理
WriteStream.prototype.write = function(
    chunk,
    encoding = this.encoding,
    callback
) {
    // 为了方便操作将要写入的数据转换成 Buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);

    // 维护缓存的长度
    this.len += chunk.lenth;

    // 维护是否触发 drain 事件的标识
    this.needDrain = this.highWaterMark <= this.len;

    // 如果正在写入
    if (this.writing) {
        this.buffer.push({
            chunk,
            encoding,
            callback
        });
    } else {
        // 更改标识为正在写入,再次写入的时候走缓存
        this.writing = true;
        // 如果已经写入清空缓存区的内容
        this._write(chunk, encoding, () => this.clearBuffer());
    }

    return !this.needDrain;
};复制代码

与可写流的 read 一样,我们在使用 write 方法将数据写入文件时,也是操作 Buffer,在 write 方法中,首先将接收到的要写入的数据转换成了 Buffer,因为是多次写入,要知道缓存中 Buffer 字节数的总长度,所以维护了 len 变量。

我们的 WriteStream 构造函数中, this 挂载了 needDrain 属性,在使用 fs.createWriteStream 创建的可读流时,是写入的字节长度超过 highWaterMark 才会触发 drain 事件,而 needDrainwrite 的返回值正好相反,所以我们用 needDrain 取反来作为 write 方法的返回值。

在写入的逻辑中第一次是直接通过内存写入到文件,但是再次写入就需要将数据存入缓存,将数据写入到文件中写入状态 writing 默认为 false ,通过缓存再写入证明应该正在写入中,所以在第一次写入后应更改 writing 的状态为 true ,写入缓存其实就是把转换的 Buffer、编码以及写入成功后要执行的回调挂在一个对象上存入缓存的数组 buffer 中。

我们把真正写入文件的逻辑抽取成一个单独的方法 _write ,并传入 chunk (要写入的内容,已经处理成 Buffer)、 encoding (字符编码)、回调函数,在回调函数中执行了原型方法 clearBuffer ,接下来就来实现 _writeclearBuffer

注意:方法使用 `

` 开头代表私有方法,轻易不要在外部调用或修改,这是一个开发者之间约定俗成的不成文规定。_

4、真正的文件操作 _write

对比可读流(流动模式)的 read 方法,在调用 _write 方法写入时,是在创建可写流之后的同步代码中执行的,与可读流在 newListener 中同步执行 read 的情况类似,所以为了保证 _write 的逻辑是在 open 方法打开文件以后执行,使用了与 read 相同的处理方式。

_write 方法

// 真正的写入文件操作的方法
WriteStream.prototype._write = function(chunk, encoding, callback) {
    // 由于 open 异步执行,write 是在创建实例时同步执行
    // write 执行可能早于 open,此时不存在文件描述符
    if (typeof this.fd !== "number") {
        // 因为 open 用 emit 触发了 open 事件,所以在这是重新执行 write
        return this.once("open", () => this._write(chunk, encoding, callback));
    }

    // 读取文件
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
        // 维护下次写入的位置和缓存区 Buffer 的总字节数
        this.pos += bytesWritten;
        this.len -= bytesWritten;
        callback();
    });
};复制代码

在打开文件并写入的时候需要维护两个变量,下次写入的位置 pos 和当前缓存区内 Buffer 所占总字节数 len ,本次写入了多少个字节,下次写入需要在写入位置的基础上加多少个字节,而 len 恰恰相反,本次写入了多少个字节,缓存区中的总长度应该对应的减少多少个字节。

在维护两个变量的值以后调用 callback ,其实 callback 内执行的是 clearBuffer 方法,就如方法名,译为 “清空缓存”,其实就是一次一次的将数据写入文件并从缓存中移除,很明显需要递归调用 _write 方法,我们将这个递归的逻辑统一放在 clearBuffer 方法中实现。

5、清空缓存操作 clearBuffer

clearBuffer 方法

// 清空缓存方法
WriteStream.prototype.clearBuffer = function() {
    // 先写入的在数组前面,从前面取出缓存中的 Buffer
    let buf = this.buffer.shift();

    // 如果存在 buf,证明缓存还有 Buffer 需要写入
    if (buf) {
        // 递归 _write 按照编码将数据写入文件
        this._write(buf.chunk, buf.encoding, () => this.clearBuffer);
    } else {
        // 如果没有 buf,说明缓存内的内容已经完全写入文件并清空,需要触发 drain 事件
        this.emit("drain");

        // 更改正在写入状态
        this.writing = false;

        // 更改是否需要触发 drain 事件状态
        this.needDrain = false;
    }
};复制代码

clearBuffer 方法中获取了缓存区数组的最前面的 Buffer(最前面的是先写入缓存的,也应该先取出来写入文件),存在这个 Buffer 时,递归 _write 方法按照编码将数据写入文件,如果不存在说明缓存区已经清空了,代表内容完全写入文件中,所以触发 drain 事件,最后更改了 writingneedDrain 的状态。

更正 writing 是为了 WriteStream 创建的可读流在下次调用 write 方法时默认第一次真正写入文件,而更正 needDrain 的状态是在缓存区要清空的最后一个 Buffer 的长度小于了 highWaterMark 时,保证 write 方法的返回值是正确的。

第一次是真正写入,其他的都写入缓存,再一个一个的将缓存中存储的 Buffer 写入并从缓存清空,之所以这样设计是为了把写入的内容排成一个队列,假如有 3 个人同时操作一个文件写入内容,只有第一个人是真的写入,其他的人都写在缓存中,再按照写入缓存的顺序依次写入文件,避免冲突和写入顺序出错。

验证可写流 WriteStream

接下来我们使用自己实现的 WriteStream 类来创建可写流,并按照 fs.createWriteStream 的用法进行使用并验证。

验证 WriteStream

// 向 1.txt 文件中写入 012345
const fs = require("fs");
const WriteStream = require("./WriteStream");

// 创建可写流
let ws = new WriteStream("2.txt", {
    highWaterMark: 3
});

let i = 0;

function write() {
    let flag = true;
    while (i <= 6 && flag) {
        i++;
        flag = ws.write(i + "", "utf8");
    }
}

ws.on("drain", function() {
    console.log("写入成功");
    write();
});
write();

// true
// true
// false
// 写入成功
// true
// true
// false
// 写入成功复制代码

可以使用 fs.createWriteStreamWriteStream 类分别执行上面的代码,对比结果,看看是否相同。

可读流和可写流的桥梁 pipe

可写流和可读流一般是通过 pipe 配合来使用的, pipe 方法是可读流 ReadStream 的原型方法,参数为一个可写流。

pipe 方法

// 连接可读流和可写流的方法 pipe
ReadStream.prototype.pipe = function(dest) {
    // 开始读取
    this.on("data", data => {
        // 如果超出可写流的 highWaterMark,暂停读取
        let flag = dest.write(data);
        if (!flag) this.pause();
    });

    dest.on("drain", () => {
        // 当可写流清空内存时恢复读取
        this.resume();
    });

    this.on("end", () => {
        // 在读取完毕后关闭文件
        this.destroy();
    });
};复制代码

pipe 方法其实就是通过可读流的 data 事件触发流动状态,并用可写流接收读出的数据进行写入,当写入数据超出 highWaterMark ,则暂停可读流的读取,直到可写流的缓存被清空并把内容写进文件后,恢复可读流的读取,当读取结束后关闭文件。

下面我们实现一个将 1.txt 的内容拷贝 2.txt 中的例子。

验证 pipe

// pipe 的使用
const fs = require("fs");

// 引入自己的 ReadStream 类和 WriteStream 类
const ReadStream = rquire("./ReadStream");
const WriteStream = rquire("./WriteStream");

// 创建可读流和可写流
let rs = new ReadStream("1.txt", {
    highWaterMark: 3
});
let ws = new WriteStream("2.txt", {
    highWaterMark: 2
});

// 使用 pipe 实现文件内容复制
rs.pipe(ws);复制代码

总结

在 NodeJS 源码中,可读流和可写流的内容要比本篇内容多很多,本篇是将源码精简,抽出核心逻辑并针对流的使用方式进行实现,主要目的是帮助理解流的原理和使用,争取做到 “知其然知其所以然”,了解了一些底层再对流使用时,也能游刃有余。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C语言基础

C语言基础

安博教育集团 / 2012-2 / 37.00元

《C语言基础》深入浅出地介绍了C语言程序设计的基础知识,内容涉及C语言基础、算法基础、变量、数据类型、运算符、输入/输出相关函数、选择结构、循环结构、各种表达式、数组、字符串、指针、函数、结构体、ISO C99的扩展语法等。全书内容丰富,结构严谨,层次清晰,语言生动,论述精准而深刻,实例丰富而实用。 《C语言基础》不仅适合用做计算机职业培训的首选教材,也可作为普通高校教材使用,更是C语言初学......一起来看看 《C语言基础》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码