内容简介:如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或流从在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的
如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或 github NodeJs系列文章
流从 早先的unix 初出茅庐,在过去的几十年的时间里,它被证明是一种可依赖的编程方式,它可以将一个大型的系统拆成一些很小的部分,并且让这些部分之间完美地进行合作。
在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的 console
,都极有可能涉及到流。
Node.js 中有四种基本的流类型:
- Readable - 可读取数据的流(例如 fs.createReadStream())。
- Writable - 可写入数据的流(例如 fs.createWriteStream())。
- Duplex - 可读又可写的流(例如 net.Socket)。
- Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())
为什么使用流
假设我们需要使用node来实现一个简单的静态文件服务器:
const http = require('http'); const fs = require('fs'); http.createServer((req,res)=>{ fs.readFile('./test.html',function(err,data){ if(err){ res.statusCode = 500; res.end(); }else{ res.end(data); } }) }).listen(3000)
上述代码简单实现了静态文件的读取和发送,逻辑上是完全可行的。但是由于 readFile
是一次性将读取的文件存放在内存中的,假设 test.html
文件非常大或者访问量增多的情况下,服务器内存很有可能耗尽。这时我们就需要使用流的方式进行改进:
const http = require('http'); const fs = require('fs'); http.createServer((req,res)=>{ fs.createReadStream('./test.html').pipe(res); }).listen(3000);
fs.createReadStream
创建一个可读流,逐次读取文件内容供给下游消费,这种逐步读取和消费的方式,有效减缓了内存的消耗。
可读流(Readable Stream)
我们可以把 Readable Stream拆分成两个阶段:push阶段和pull阶段,在push阶段,通过实现 _read
方法将数据从底层数据资源池中推送到缓存池中,这是数据的生产阶段,而pull阶段,则是将缓存池的数据拉出,供下游使用,这是数据的消费阶段。
在开始进一步讲解之前,我们先来介绍几个字段,这些字段来源于node源码:
-
state.buffer
:Array
缓存池,每个元素对应push(data)中的data -
state.length
:Number
缓存池中的数据量,在objectMode
模式下,state.length === state.buffer.length
,否则,其值是state.buffer
中数据字节数的总和 -
state.ended
:Boolean
表示底层数据池没有可读数据了(this.pull(null)
) -
state.flowing
:Null|Boolean
表示当前流的模式,其值有三种情况:null
(初始状态)、true
(流动模式)、false
(暂停模式) -
state.needReadable
:Boolean
是否需要触发readable
事件 -
state.reading
:Boolean
是否正在读取底层数据 -
state.sync
:Boolean
是否立即触发data
/readable
事件,false
为立即触发、true
下一个tick再触发(process.nextTick
)
两种模式
可读流存在两种模式:流动模式(flowing)和暂停模式(paused),在源码中使用 state.flowing
来标识。
两种模式其基本流程都遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流动模式而言,只要缓存池还有未消耗的数据,那么数据便会不断的被提取,我们可以把它想象成一个自动的水泵,只要通电了,不抽干水池的水它是不会停下来的。而对于暂停模式,它更像是打水桶,需要的时候再从水池里面打点水出来。
所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式:
- 添加
data
事件句柄(前提是state.flowing === null
) - 调用
stream.resume()
- 调用
stream.pipe()
可读流也可以通过以下方式切换回暂停模式:
readable stream.pause() stream.unpipe()
一切从 read
开始
对于可读流而言,消费驱动生产,只有通过调用pull阶段的 read
函数,才能唤醒push阶段的数据产生,从而带动整个流的运动。所以对于可读流而言 read
是一切的起点。
这是根据源码整理的一个简单的流程图,后面将对一些环节加以说明。
howMuchToRead
调用 read(n)
过程中,node会根据实际情况调整读取的数量,实际值由 howMuchRead
决定
function howMuchToRead(n,state){ // 如果size <= 0或者不存在可读数据 if (n <= 0 || (state.length === 0 && state.ended)) return 0; // objectMode模式下 每次制度一个单位长度的数据 if (state.objectMode) return 1; // 如果size没有指定 if (Number.isNaN(n)) { // 执行read()时,由于流动模式下数据会不断输出, // 所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空 if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } if (n > state.highWaterMark) // 更新highWaterMark state.highWaterMark = computeNewHighWaterMark(n); // 如果缓存中的数据量够用 if (n <= state.length) return n; // 如果缓存中的数据不够用, // 且资源池还有可读取的数据,那么这一次先不读取缓存数据 // 留着下一次数据量足够的时候再读取 // 否则读空缓存池 if (!state.ended) { state.needReadable = true; return 0; } return state.length; }
end
事件
在 read
函数调用过程中,node会择机判定是否触发 end
事件,判定标准主要是以下两个条件:
if (state.length === 0 && state.ended) endReadable(this);
- 底层数据(资源)没有可读数据,此时
state.ended
为true
,
通过调用 pull(null)
表示底层数据当前已经没有可读数据了
- 缓存池中没有可读数据
state.length === 0
本事件在调用 read([size])
时触发(满足上述条件时)
doRead
doRead
用于判断是否读取底层数据
// 如果当前是暂停模式`state.needReadable` var doRead = state.needReadable; // 如果当前缓存池是空的或者没有足够的缓存 if (state.length === 0 || state.length - n < state.highWaterMark){ doRead = true; } if (state.ended || state.reading) { doRead = false; } else if (doRead) { // ... this._read(state.highWaterMark); // ... }
state.reading
标志上次从底层取数据的操作是否已完成,一旦 push
方法被调用,就会设置为 false
,表示此次 _read()
结束
data
事件
在 官方文档 中提到:添加 data
事件句柄,可以使Readable Stream的模式切换到流动模式,但官方没有提到的是这一结果成立的条件- state.flowing
的值不为 null
,即只有在初始状态下,监听data事件,会使流进入流动模式。举个例子:
const { Readable } = require('stream'); class ExampleReadable extends Readable{ constructor(opt){ super(opt); this._time = 0; } _read(){ this.push(String(++this._time)); } } const exampleReadable = new ExampleReadable(); // 暂停 state.flowing === false exampleReadable.pause(); exampleReadable.on('data',(chunk)=>{ console.log(`Received ${chunk.length} bytes of data.`); });
运行这个例子,我们发现终端没有任何输出,为什么会这样呢?原因我们可以从源码中看出端倪
if (state.flowing !== false) this.resume();
由此我们可以把官方表述再完善一些:在可读流初始化状态下( state.flowing === null
),添加 data
事件句柄会使流进入流动模式。
push
只能被可读流的实现调用,且只能在 readable._read() 方法中调用。
push是数据生产的核心,消费方通过调用 read(n)
促使流输出数据,而流通过_read()使底层调用push方法将数据传给流。
在这个过程中,push方法有可能将数据存放在缓存池内,也有可能直接通过 data
事件输出。下面我们一一分析。
如果当前流是流动的( state.flowing === true
),且缓存池内没有可读数据,
那么数据将直接由事件 data
输出
// node 源码 if (state.flowing && state.length === 0 && !state.sync){ state.awaitDrain = 0; stream.emit('data', chunk); }
我们举个例子:
const { Readable } = require('stream'); class ExampleReadable extends Readable{ constructor(opt){ super(opt); this.max = 100; this.time = 0; } _read(){ const seed = setTimeout(()=>{ if(this.time > 100){ this.push(null); }else{ this.push(String(++this.time)); } clearTimeout(seed); },0) } } const exampleReadable = new ExampleReadable({ }); exampleReadable.on('data',(data)=>{ console.log('from data',data); });
readable
事件
exampleReadable.on('readable',()=>{ .... });
当我们注册一个 readable
事件后,node就会做以下处理:
- 将流切换到暂停模式
state.flowing = false; state.needReadable = true;
- 如果缓存池未消耗的数据,触发
readable
,
stream.emit('readable');
- 否则,判断当前是否正在读取底层数据,如果不是,开始(nextTick)读取底层数据
self.read(0);
触发条件
state.flow === false state.length || state.ended
return !state.ended && (state.length < state.highWaterMark || state.length === 0);
参考
- Node.js v10.15.1 文档
- 深入理解 Node.js Stream 内部机制
- stream-handbook
- 如何形象的描述反应式编程中的背压(Backpressure)机制?
- 数据流中的积压问题
- Node.js Stream - 进阶篇
- Node Stream
以上所述就是小编给大家介绍的《详解NodeJs流之一》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Flutter 完整开发实战详解(十六、详解自定义布局实战)
- 数据结构 1 线性表详解 链表、 栈 、 队列 结合JAVA 详解
- 详解Openstack环境准备
- Java泛型详解
- iOS RunLoop 详解
- Raft协议详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
计算机组成:结构化方法
坦嫩鲍姆 / 刘卫东 / 人民邮电出版社 / 2006-1 / 65.00元
本书采用结构化方法来介绍计算机系统,书的内容完全建立在“计算机是由层次结构组成的,每层完成规定的功能”这一概念之上。作者对本版进行了彻底的更新,以反映当今最重要的计算机技术以及计算机组成和体系结构方面的最新进展。书中详细讨论了数字逻辑层、微体系结构层、指令系统层、操作系统层和汇编语言层,并涵盖了并行体系结构的内容,而且每一章结尾都配有丰富的习题。 本书适合作为计算机专业本科生计算机组......一起来看看 《计算机组成:结构化方法》 这本书的介绍吧!