详解NodeJs流之一

栏目: Node.js · 发布时间: 5年前

内容简介:如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或流从在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的

如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或 github NodeJs系列文章

流从 早先的unix 初出茅庐,在过去的几十年的时间里,它被证明是一种可依赖的编程方式,它可以将一个大型的系统拆成一些很小的部分,并且让这些部分之间完美地进行合作。

在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的 console ,都极有可能涉及到流。

Node.js 中有四种基本的流类型:

  • Readable - 可读取数据的流(例如 fs.createReadStream())。
  • Writable - 可写入数据的流(例如 fs.createWriteStream())。
  • Duplex - 可读又可写的流(例如 net.Socket)。
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())

为什么使用流

假设我们需要使用node来实现一个简单的静态文件服务器:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.readFile('./test.html',function(err,data){
        if(err){
            res.statusCode = 500;
            res.end();
        }else{
            res.end(data);
        }
    })
}).listen(3000)

上述代码简单实现了静态文件的读取和发送,逻辑上是完全可行的。但是由于 readFile 是一次性将读取的文件存放在内存中的,假设 test.html 文件非常大或者访问量增多的情况下,服务器内存很有可能耗尽。这时我们就需要使用流的方式进行改进:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.createReadStream('./test.html').pipe(res);
}).listen(3000);

fs.createReadStream 创建一个可读流,逐次读取文件内容供给下游消费,这种逐步读取和消费的方式,有效减缓了内存的消耗。

可读流(Readable Stream)

详解NodeJs流之一

我们可以把 Readable Stream拆分成两个阶段:push阶段和pull阶段,在push阶段,通过实现 _read 方法将数据从底层数据资源池中推送到缓存池中,这是数据的生产阶段,而pull阶段,则是将缓存池的数据拉出,供下游使用,这是数据的消费阶段。

在开始进一步讲解之前,我们先来介绍几个字段,这些字段来源于node源码:

  • state.buffer : Array 缓存池,每个元素对应push(data)中的data
  • state.length : Number 缓存池中的数据量,在 objectMode 模式下, state.length === state.buffer.length ,否则,其值是 state.buffer 中数据字节数的总和
  • state.ended : Boolean 表示底层数据池没有可读数据了( this.pull(null) )
  • state.flowing : Null|Boolean 表示当前流的模式,其值有三种情况: null (初始状态)、 true (流动模式)、 false (暂停模式)
  • state.needReadable : Boolean 是否需要触发 readable 事件
  • state.reading : Boolean 是否正在读取底层数据
  • state.sync : Boolean 是否立即触发 data / readable 事件, false 为立即触发、 true 下一个tick再触发( process.nextTick )

两种模式

可读流存在两种模式:流动模式(flowing)和暂停模式(paused),在源码中使用 state.flowing 来标识。

两种模式其基本流程都遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流动模式而言,只要缓存池还有未消耗的数据,那么数据便会不断的被提取,我们可以把它想象成一个自动的水泵,只要通电了,不抽干水池的水它是不会停下来的。而对于暂停模式,它更像是打水桶,需要的时候再从水池里面打点水出来。

所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式:

  • 添加 data 事件句柄(前提是 state.flowing === null
  • 调用 stream.resume()
  • 调用 stream.pipe()

可读流也可以通过以下方式切换回暂停模式:

readable
stream.pause()
stream.unpipe()

一切从 read 开始

对于可读流而言,消费驱动生产,只有通过调用pull阶段的 read 函数,才能唤醒push阶段的数据产生,从而带动整个流的运动。所以对于可读流而言 read 是一切的起点。

这是根据源码整理的一个简单的流程图,后面将对一些环节加以说明。

详解NodeJs流之一

howMuchToRead

调用 read(n) 过程中,node会根据实际情况调整读取的数量,实际值由 howMuchRead 决定

function howMuchToRead(n,state){
  // 如果size <= 0或者不存在可读数据 
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
    
  // objectMode模式下 每次制度一个单位长度的数据
  if (state.objectMode)
    return 1;
    
  // 如果size没有指定
  if (Number.isNaN(n)) {
    // 执行read()时,由于流动模式下数据会不断输出,
    // 所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  
  if (n > state.highWaterMark)
    // 更新highWaterMark
    state.highWaterMark = computeNewHighWaterMark(n);

  // 如果缓存中的数据量够用
  if (n <= state.length)
    return n;
    
  // 如果缓存中的数据不够用,
  // 且资源池还有可读取的数据,那么这一次先不读取缓存数据
  // 留着下一次数据量足够的时候再读取
  // 否则读空缓存池
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

end 事件

read 函数调用过程中,node会择机判定是否触发 end 事件,判定标准主要是以下两个条件:

if (state.length === 0 && state.ended) endReadable(this);
  1. 底层数据(资源)没有可读数据,此时 state.endedtrue

通过调用 pull(null) 表示底层数据当前已经没有可读数据了

  1. 缓存池中没有可读数据 state.length === 0

本事件在调用 read([size]) 时触发(满足上述条件时)

doRead

doRead 用于判断是否读取底层数据

// 如果当前是暂停模式`state.needReadable`
  var doRead = state.needReadable;
  
  // 如果当前缓存池是空的或者没有足够的缓存
  if (state.length === 0 || state.length - n < state.highWaterMark){
    doRead = true;
  }

  if (state.ended || state.reading) {
    doRead = false;
  } else if (doRead) {
    // ...
    this._read(state.highWaterMark);
    // ...
  }

state.reading 标志上次从底层取数据的操作是否已完成,一旦 push 方法被调用,就会设置为 false ,表示此次 _read() 结束

data 事件

官方文档 中提到:添加 data 事件句柄,可以使Readable Stream的模式切换到流动模式,但官方没有提到的是这一结果成立的条件- state.flowing 的值不为 null ,即只有在初始状态下,监听data事件,会使流进入流动模式。举个例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this._time = 0;
  }
  _read(){
    this.push(String(++this._time));
  }
}

const exampleReadable = new ExampleReadable();
// 暂停 state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
  console.log(`Received ${chunk.length} bytes of data.`);
});

运行这个例子,我们发现终端没有任何输出,为什么会这样呢?原因我们可以从源码中看出端倪

if (state.flowing !== false)
      this.resume();

由此我们可以把官方表述再完善一些:在可读流初始化状态下( state.flowing === null ),添加 data 事件句柄会使流进入流动模式。

push

只能被可读流的实现调用,且只能在 readable._read() 方法中调用。

push是数据生产的核心,消费方通过调用 read(n) 促使流输出数据,而流通过_read()使底层调用push方法将数据传给流。

在这个过程中,push方法有可能将数据存放在缓存池内,也有可能直接通过 data 事件输出。下面我们一一分析。

如果当前流是流动的( state.flowing === true ),且缓存池内没有可读数据,

那么数据将直接由事件 data 输出

// node 源码
if (state.flowing && state.length === 0 && !state.sync){
    state.awaitDrain = 0;
    stream.emit('data', chunk);
}

我们举个例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this.max = 100;
    this.time = 0;
  }
  _read(){
    const seed = setTimeout(()=>{
      if(this.time > 100){
        this.push(null);
      }else{
        this.push(String(++this.time));
      }
      clearTimeout(seed);
    },0)
  }
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
  console.log('from data',data);
});

readable 事件

exampleReadable.on('readable',()=>{
    ....
});

当我们注册一个 readable 事件后,node就会做以下处理:

  1. 将流切换到暂停模式
state.flowing = false; 
state.needReadable = true;
  1. 如果缓存池未消耗的数据,触发 readable
stream.emit('readable');
  1. 否则,判断当前是否正在读取底层数据,如果不是,开始(nextTick)读取底层数据 self.read(0);

触发条件

state.flow === false
state.length || state.ended
return !state.ended &&
    (state.length < state.highWaterMark || state.length === 0);

参考

详解NodeJs流之一


以上所述就是小编给大家介绍的《详解NodeJs流之一》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

UX设计之道

UX设计之道

[美] 昴格尔、[美] 钱德勒 / 孙亮 / 人民邮电出版社 / 2010-4 / 35.00元

《UX设计之道:以用户体验为中心的Web设计》可以看作是用户体验设计的核心参考书。无论是Web设计领域的创业者、项目管理者还是用户体验的策划者和实施者,或是有志于Web设计领域的学生,都应该了解《UX设计之道:以用户体验为中心的Web设计》中的知识。 用户是网站的根本,网站要达到自己的商业目标,必须满足目标用户的需求——这就是以用户体验为中心的网站设计。那么,用户需求从何而来?如何将用户需......一起来看看 《UX设计之道》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具