Node 深入Stream(1)

栏目: JavaScript · 发布时间: 6年前

内容简介:实现了如果指定utf8编码highWaterMark要大于3个字节流切换到流动模式,数据会被尽可能快的读出

1. 流的概念

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

2.可读流createReadStream

实现了 stream.Readable 接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);
复制代码

2.1 创建可读流

var rs = fs.createReadStream(path,[options]);
复制代码
  1. path读取文件的路径
  2. options
    • flags打开文件要做的操作,默认为'r'
    • encoding默认为null
    • start开始读取的索引位置
    • end结束读取的索引位置(包括结束位置)
    • highWaterMark读取缓存区默认的大小64kb

如果指定utf8编码highWaterMark要大于3个字节

2.2 监听data事件

流切换到流动模式,数据会被尽可能快的读出

rs.on('data', function (data) {
    console.log(data);
});
复制代码

2.3 监听end事件

该事件会在读完数据后被触发

rs.on('end', function () {
    console.log('读取完成');
});
复制代码

2.4 监听error事件

rs.on('error', function (err) {
    console.log(err);
});
复制代码

2.5 监听open事件

rs.on('open', function () {
    console.log(err);
});
复制代码

2.6 监听close事件

rs.on('close', function () {
    console.log(err);
});
复制代码

2.7 设置编码

与指定{encoding:'utf8'}效果相同,设置编码

rs.setEncoding('utf8');
复制代码

2.8 暂停和恢复触发data

通过pause()方法和resume()方法

rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);
复制代码

3.可写流createWriteStream

实现了stream.Writable接口的对象来将流数据写入到对象中

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};

util.inherits(WriteStream, Writable);
复制代码

3.1 创建可写流

var ws = fs.createWriteStream(path,[options]);
复制代码
  1. path写入的文件路径
  2. options
    • flags打开文件要做的操作,默认为'w'
    • encoding默认为utf8
    • highWaterMark写入缓存区的默认大小16kb

3.2 write方法

ws.write(chunk,[encoding],[callback]);
复制代码
  1. chunk写入的数据buffer/string
  2. encoding编码格式chunk为字符串时有用,可选
  3. callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为false,未满时为true

3.3 end方法

ws.end(chunk,[encoding],[callback]);
复制代码

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3.4 drain方法

  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发
  • 建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt',{
      flags:'w',
      encoding:'utf8',
      highWaterMark:3
    });
    let i = 10;
    function write(){
     let  flag = true;
     while(i&&flag){
          flag = ws.write("1");
          i--;
         console.log(flag);
     }
    }
    write();
    ws.on('drain',()=>{
      console.log("drain");
      write();
    });
    复制代码

3.5 finish方法

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发。

var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
  writer.write(`hello, ${i}!\n`);
}
writer.end('结束\n');
writer.on('finish', () => {
  console.error('所有的写入已经完成!');
});
复制代码

4.pipe方法

4.1 pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
复制代码

4.2 pipe用法

readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
复制代码

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

4.3 unpipe用法

  • readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离
  • 如果 destination 没有传入, 则所有绑定的流都会被分离.
    let fs = require('fs');
    var from = fs.createReadStream('./1.txt');
    var to = fs.createWriteStream('./2.txt');
    from.pipe(to);
    setTimeout(() => {
    console.log('关闭向2.txt的写入');
    from.unpipe(writable);
    console.log('手工关闭文件流');
    to.end();
    }, 1000);
    复制代码

4.4 cork

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。

4.5 uncork

writable.uncork()将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
复制代码

5. 简单实现

5.1 可读流的简单实现

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
复制代码
let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.end = options.end;
        this.flowing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.length = 0;
        this.on('newListener', (type, listener) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        });
        this.on('end', () => {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.open();
    }

    read() {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd,this.buffer,0,n,this.pos,(err,bytesRead)=>{
            if(err){
             return;
            }
            if(bytesRead){
                let data = this.buffer.slice(0,bytesRead);
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                this.pos += bytesRead;
                if(this.end && this.pos > this.end){
                  return this.emit('end');
                }
                if(this.flowing)
                    this.read();
            }else{
                this.emit('end');
            }
        })
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }


    end() {
        if (this.autoClose) {
            this.destroy();
        }
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }

}

module.exports = WriteStream;
复制代码

5.2 可写流的简单实现

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
复制代码
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends  EventEmitter{
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.writing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.buffers = [];
        this.length = 0;
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }

    write(chunk, encoding, cb) {
        if (typeof encoding == 'function') {
            cb = encoding;
            encoding = null;
        }

        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
        let len = chunk.length;
        this.length += len;
        let ret = this.length < this.highWaterMark;
        if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                cb,
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding,this.clearBuffer.bind(this));
        }
        return ret;
    }

    _write(chunk, encoding, cb) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this._write(chunk, encoding, cb));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.length -= written;
            this.pos += written;
            cb && cb();
        });
    }

    clearBuffer() {
        let data = this.buffers.shift();
        if (data) {
            this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
        } else {
            this.writing = false;
            this.emit('drain');
        }
    }

    end() {
        if (this.autoClose) {
            this.emit('end');
            this.destroy();
        }
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }

}

module.exports = WriteStream;
复制代码

5.3 pipe

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
复制代码
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
复制代码

5.4 暂停模式

let fs =require('fs');
let ReadStream2 = require('./ReadStream2');
let rs = new ReadStream2('./1.txt',{
    start:3,
    end:8,
    encoding:'utf8',
    highWaterMark:3
});
rs.on('readable',function () {
    console.log('readable');
    console.log('rs.buffer.length',rs.length);
    let d = rs.read(1);
    console.log(d);
    console.log('rs.buffer.length',rs.length);

    setTimeout(()=>{
        console.log('rs.buffer.length',rs.length);
    },500)
});
复制代码

` let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || 'r'; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on('end', function () { if (this.autoClose) { this.destroy(); } }); this.on('newListener', (type) => { if (type == 'data') { this.flowing = true; this.read(); } if (type == 'readable') { this.read(0); } }); this.open(); }

open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            if (this.autoClose) {
                this.destroy();
                return this.emit('error', err);
            }
        }
        this.fd = fd;
        this.emit('open');
    });
}

read(n) {
    if (typeof this.fd != 'number') {
        return this.once('open', () => this.read());
    }
    n = parseInt(n,10);
    if(n != n){
        n = this.length;
    }
    if(this.length ==0)
        this.needReadable = true;
    let ret;
    if (0<n < this.length) {
        ret = Buffer.alloc(n);
        let b ;
        let index = 0;
        while(null != (b = this.buffers.shift())){
            for(let i=0;i<b.length;i++){
                ret[index++] = b[i];
                if(index == ret.length){
                    this.length -= n;
                    b = b.slice(i+1);
                    this.buffers.unshift(b);
                    break;
                }
            }
        }
        if (this.encoding) ret = ret.toString(this.encoding);
    }

    let _read = () => {
        let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
            if (err) {
                return
            }
            let data;
            if (bytesRead > 0) {
                data = this.buffer.slice(0, bytesRead);
                this.pos += bytesRead;
                this.length += bytesRead;
                if (this.end && this.pos > this.end) {
                    if(this.needReadable){
                        this.emit('readable');
                    }

                    this.emit('end');
                } else {
                    this.buffers.push(data);
                    if(this.needReadable){
                        this.emit('readable');
                        this.needReadable = false;
                    }

                }
            } else {
                if(this.needReadable) {
                    this.emit('readable');
                }
                return this.emit('end');
            }
        })
    }
    if (this.length == 0 || (this.length < this.highWaterMark)) {
        _read(0);
    }
    return ret;
}

destroy() {
    fs.close(this.fd, (err) => {
        this.emit('close');
    });
}

pause() {
    this.flowing = false;
}

resume() {
    this.flowing = true;
    this.read();
}

pipe(dest) {
    this.on('data', (data) => {
        let flag = dest.write(data);
        if (!flag) this.pause();
    });
    dest.on('drain', () => {
        this.resume();
    });
    this.on('end', () => {
        dest.end();
    });
}复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Coming of Age in Second Life

Coming of Age in Second Life

Tom Boellstorff / Princeton University Press / 2008-04-21 / USD 29.95

The gap between the virtual and the physical, and its effect on the ideas of personhood and relationships, is the most interesting aspect of Boellstorff's analysis... Boellstorff's portrayal of a virt......一起来看看 《Coming of Age in Second Life》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具