详解NodeJs流之一

栏目: Node.js · 发布时间: 6年前

内容简介:如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或流从在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的

如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或 github NodeJs系列文章

流从 早先的unix 初出茅庐,在过去的几十年的时间里,它被证明是一种可依赖的编程方式,它可以将一个大型的系统拆成一些很小的部分,并且让这些部分之间完美地进行合作。

在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的 console ,都极有可能涉及到流。

Node.js 中有四种基本的流类型:

  • Readable - 可读取数据的流(例如 fs.createReadStream())。
  • Writable - 可写入数据的流(例如 fs.createWriteStream())。
  • Duplex - 可读又可写的流(例如 net.Socket)。
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())

为什么使用流

假设我们需要使用node来实现一个简单的静态文件服务器:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.readFile('./test.html',function(err,data){
        if(err){
            res.statusCode = 500;
            res.end();
        }else{
            res.end(data);
        }
    })
}).listen(3000)

上述代码简单实现了静态文件的读取和发送,逻辑上是完全可行的。但是由于 readFile 是一次性将读取的文件存放在内存中的,假设 test.html 文件非常大或者访问量增多的情况下,服务器内存很有可能耗尽。这时我们就需要使用流的方式进行改进:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.createReadStream('./test.html').pipe(res);
}).listen(3000);

fs.createReadStream 创建一个可读流,逐次读取文件内容供给下游消费,这种逐步读取和消费的方式,有效减缓了内存的消耗。

可读流(Readable Stream)

详解NodeJs流之一

我们可以把 Readable Stream拆分成两个阶段:push阶段和pull阶段,在push阶段,通过实现 _read 方法将数据从底层数据资源池中推送到缓存池中,这是数据的生产阶段,而pull阶段,则是将缓存池的数据拉出,供下游使用,这是数据的消费阶段。

在开始进一步讲解之前,我们先来介绍几个字段,这些字段来源于node源码:

  • state.buffer : Array 缓存池,每个元素对应push(data)中的data
  • state.length : Number 缓存池中的数据量,在 objectMode 模式下, state.length === state.buffer.length ,否则,其值是 state.buffer 中数据字节数的总和
  • state.ended : Boolean 表示底层数据池没有可读数据了( this.pull(null) )
  • state.flowing : Null|Boolean 表示当前流的模式,其值有三种情况: null (初始状态)、 true (流动模式)、 false (暂停模式)
  • state.needReadable : Boolean 是否需要触发 readable 事件
  • state.reading : Boolean 是否正在读取底层数据
  • state.sync : Boolean 是否立即触发 data / readable 事件, false 为立即触发、 true 下一个tick再触发( process.nextTick )

两种模式

可读流存在两种模式:流动模式(flowing)和暂停模式(paused),在源码中使用 state.flowing 来标识。

两种模式其基本流程都遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流动模式而言,只要缓存池还有未消耗的数据,那么数据便会不断的被提取,我们可以把它想象成一个自动的水泵,只要通电了,不抽干水池的水它是不会停下来的。而对于暂停模式,它更像是打水桶,需要的时候再从水池里面打点水出来。

所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式:

  • 添加 data 事件句柄(前提是 state.flowing === null
  • 调用 stream.resume()
  • 调用 stream.pipe()

可读流也可以通过以下方式切换回暂停模式:

readable
stream.pause()
stream.unpipe()

一切从 read 开始

对于可读流而言,消费驱动生产,只有通过调用pull阶段的 read 函数,才能唤醒push阶段的数据产生,从而带动整个流的运动。所以对于可读流而言 read 是一切的起点。

这是根据源码整理的一个简单的流程图,后面将对一些环节加以说明。

详解NodeJs流之一

howMuchToRead

调用 read(n) 过程中,node会根据实际情况调整读取的数量,实际值由 howMuchRead 决定

function howMuchToRead(n,state){
  // 如果size <= 0或者不存在可读数据 
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
    
  // objectMode模式下 每次制度一个单位长度的数据
  if (state.objectMode)
    return 1;
    
  // 如果size没有指定
  if (Number.isNaN(n)) {
    // 执行read()时,由于流动模式下数据会不断输出,
    // 所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  
  if (n > state.highWaterMark)
    // 更新highWaterMark
    state.highWaterMark = computeNewHighWaterMark(n);

  // 如果缓存中的数据量够用
  if (n <= state.length)
    return n;
    
  // 如果缓存中的数据不够用,
  // 且资源池还有可读取的数据,那么这一次先不读取缓存数据
  // 留着下一次数据量足够的时候再读取
  // 否则读空缓存池
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

end 事件

read 函数调用过程中,node会择机判定是否触发 end 事件,判定标准主要是以下两个条件:

if (state.length === 0 && state.ended) endReadable(this);
  1. 底层数据(资源)没有可读数据,此时 state.endedtrue

通过调用 pull(null) 表示底层数据当前已经没有可读数据了

  1. 缓存池中没有可读数据 state.length === 0

本事件在调用 read([size]) 时触发(满足上述条件时)

doRead

doRead 用于判断是否读取底层数据

// 如果当前是暂停模式`state.needReadable`
  var doRead = state.needReadable;
  
  // 如果当前缓存池是空的或者没有足够的缓存
  if (state.length === 0 || state.length - n < state.highWaterMark){
    doRead = true;
  }

  if (state.ended || state.reading) {
    doRead = false;
  } else if (doRead) {
    // ...
    this._read(state.highWaterMark);
    // ...
  }

state.reading 标志上次从底层取数据的操作是否已完成,一旦 push 方法被调用,就会设置为 false ,表示此次 _read() 结束

data 事件

官方文档 中提到:添加 data 事件句柄,可以使Readable Stream的模式切换到流动模式,但官方没有提到的是这一结果成立的条件- state.flowing 的值不为 null ,即只有在初始状态下,监听data事件,会使流进入流动模式。举个例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this._time = 0;
  }
  _read(){
    this.push(String(++this._time));
  }
}

const exampleReadable = new ExampleReadable();
// 暂停 state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
  console.log(`Received ${chunk.length} bytes of data.`);
});

运行这个例子,我们发现终端没有任何输出,为什么会这样呢?原因我们可以从源码中看出端倪

if (state.flowing !== false)
      this.resume();

由此我们可以把官方表述再完善一些:在可读流初始化状态下( state.flowing === null ),添加 data 事件句柄会使流进入流动模式。

push

只能被可读流的实现调用,且只能在 readable._read() 方法中调用。

push是数据生产的核心,消费方通过调用 read(n) 促使流输出数据,而流通过_read()使底层调用push方法将数据传给流。

在这个过程中,push方法有可能将数据存放在缓存池内,也有可能直接通过 data 事件输出。下面我们一一分析。

如果当前流是流动的( state.flowing === true ),且缓存池内没有可读数据,

那么数据将直接由事件 data 输出

// node 源码
if (state.flowing && state.length === 0 && !state.sync){
    state.awaitDrain = 0;
    stream.emit('data', chunk);
}

我们举个例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this.max = 100;
    this.time = 0;
  }
  _read(){
    const seed = setTimeout(()=>{
      if(this.time > 100){
        this.push(null);
      }else{
        this.push(String(++this.time));
      }
      clearTimeout(seed);
    },0)
  }
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
  console.log('from data',data);
});

readable 事件

exampleReadable.on('readable',()=>{
    ....
});

当我们注册一个 readable 事件后,node就会做以下处理:

  1. 将流切换到暂停模式
state.flowing = false; 
state.needReadable = true;
  1. 如果缓存池未消耗的数据,触发 readable
stream.emit('readable');
  1. 否则,判断当前是否正在读取底层数据,如果不是,开始(nextTick)读取底层数据 self.read(0);

触发条件

state.flow === false
state.length || state.ended
return !state.ended &&
    (state.length < state.highWaterMark || state.length === 0);

参考

详解NodeJs流之一


以上所述就是小编给大家介绍的《详解NodeJs流之一》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Definitive Guide to HTML5 WebSocket

The Definitive Guide to HTML5 WebSocket

Vanessa Wang、Frank Salim、Peter Moskovits / Apress / 2013-3 / USD 26.30

The browser is, hands down, the most popular and ubiquitous deployment platform available to us today: virtually every computer, smartphone, tablet, and just about every other form factor imaginable c......一起来看看 《The Definitive Guide to HTML5 WebSocket》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具