内容简介:前段时间遇到项目上需要请求资源方获取opus编码的音频文件,然后置入ogg容器中传输给前端标准化播放器进行播放的需求。流程模式是,通过服务上建立的socket连接不断接收资源方传送的文件块。而前端请求中层服务是HTTP请求。一个简单的需求,在Node.js服务中,比较适合处理方式是使用Stream,通过pipe不同的加解密流以及最后的HTTP responses传输给前端标准格式的文件。由于用到很多流处理方式,所以在此总结一下Node.js Streams的模块使用基础。Stream就是数据信息的一个传输
前段时间遇到项目上需要请求资源方获取opus编码的音频文件,然后置入ogg容器中传输给前端标准化播放器进行播放的需求。流程模式是,通过服务上建立的socket连接不断接收资源方传送的文件块。而前端请求中层服务是HTTP请求。
一个简单的需求,在Node.js服务中,比较适合处理方式是使用Stream,通过pipe不同的加解密流以及最后的HTTP responses传输给前端标准格式的文件。由于用到很多流处理方式,所以在此总结一下Node.js Streams的模块使用基础。
1. Node.js Stream 使用场景
Stream就是数据信息的一个传输集合。适合进行大文件和连续的传输文件块的处理,但不仅限于此。比如我们要将一个文件进行加密然后再压缩再传输的,通过让数据在不同的Stream中传输处理,无论在书写还是处理效率上都很有优势。
2. Stream到底是什么
Node.js中,Buffer是非常重要的一个模块。在很多数据处理中会有所应用。它进行内存分配不是通过V8实现的,而是分配的外部对外内存,主要原因可能是V8的垃圾回收机制过于影响性能。Buffer,性能部分是C++实现,而于Node.js进行了非性能方面的实现以及开放调用。
简单理解,Stream其实就是Buffer的一个更高级封装加另外实现。但Stream和Buffer在使用中,还是具有不同。比如,Buffer在分配完成读取所有数据后,才能进行使用,而Stream只要建立,当消费者需要使用时便可以使用。同时,数据可以进入缓冲区,而这个缓冲区,其实就是Buffer。
不难看出,在这样的实现下,在处理过程中,特别是大文件处理,Stream的占用内存会更低,而处理效率会更高。
我们通过两段代码来查看两者的区别
// 使用Buffer拷贝test.file大文件 大小 556641 KB const fs = require('fs') const time = Date.now() fs.readFile('./test.file', (err, buffer) => { fs.writeFile('./test.buffer.file', buffer, err => { console.log('memory use: ', process.memoryUsage()) console.log('buffer', Date.now() - time) console.log('finish...') }) }) 复制代码
// 使用Stream拷贝test.file大文件 大小 556641 KB const fs = require('fs') const time = Date.now() fs.createReadStream('./test.file') .pipe(fs.createWriteStream('./test.stream.file')) .on('finish', () => { console.log('memory use: ', process.memoryUsage()) console.log('stream', Date.now() - time) console.log('finish...') }) 复制代码
以下是输出结果: (只是一个简单的测试,但是区别还是比较明显的)
如果需要,两者之间也可以进行转换
// stream to buffer function streamToBuffer(stream, cb) { let buffers = []; stream.on('error', function(err) { console.log(err) }) stream.on('data', function(data) { buffers.push(data) }) stream.on('end', function() { cb(buffers) }) } 复制代码
// buffer to stream var stream = require('stream') function bufferToStream(buffer) { var stream = new stream.Duplex() stream.push(buffer) // 读入 stream.push(null) // null 代表读入结束 return stream } 复制代码
3. Stream 分类和基础用法
Node.js中,Stream有4种类型
-
Readable: 可读流
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
- process.stdin
-
Writable: 可写流
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout
- process.stderr
-
Duplex: 可读可写
- TCP sockets
- zlib streams
- crypto streams
-
Tranform: 读写过程中处理
- zlib streams
- crypto streams
虽然,上述Duplex和Tranform分类中包含了同样的Node.js实现的库,但是其实两种Stream存在区别。Duplex,可读可写,而在读写上的操作是相互独立存在的,互相不影响。而Tranform,读写是统一的,也就是说,Tranform中读入的数据经过处理,会直接写入。二者都是继承了Readable和Writable后进行实现的。
基本使用
// 文件系统下 const fs = require('fs') var rStream = fs.createReadStream(file) var wStream = fs.createWriteStream(file) // 直接使用stream模块 const stream = require('stream') var rStream = new stream.Readable() var wStream = new stream.Writable() var dStream = new stream.Duplex() var tStream = new stream.Transform() // 读写流的事件监听 rOrWStream.on('open', function() { // 监听打开 }) rOrWStream.on('data', function(data) { // 监听数据 }) rOrWStream.on('error', function(error) { // 监听读写错误 }) rOrWStream.on('end', function(end) { // 监听读取或写入结束 }) rOrWStream.on('close', function() { // 监听关闭流 }) 复制代码
stream模块中,不同类型stream提供了多种的方法可以调用,不在此过多赘述,可以查看官方文档。
其中比较特别的是,在stream中,参数highWaterMark设置值,是stream存储的最大值,触发了stream存储设置的highWaterMark后,Writable和Readable两者表现有些许不同。
const fs = require('fs') var rStream = fs.createReadStream(file, { flags: 'r', // 指定用什么模式打开文件,’w’代表写,’r’代表读,类似的还有’r+’、’w+’、’a’等 encoding: 'utf8', // 编码格式 autoClose: true, // 是否发生错误或结束时自动关闭 highWaterMark: 9, // 单位KB,不设置默认为16KB start:0, // 开始读取范围 end:0 // 结束读取范围,从文件中读取一个字节范围,而不是整个文件 }) 复制代码
Writable触发后,不能继续写入,调用Writable write方法返回false,而当缓存区可以继续写入数据的时候,是会触发'drain'事件。
Readable,存在三种状态
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
null表示没有提供消费流数据的机制,所以流不会产生数据。监听 'data' 事件、调用pipe、resume方法都会使状态切换到 true,可读流开始主动地产生数据并触发事件。 调用pause、unpipe,或接收到背压(也就是缓存区达到highWaterMark值,如上writable同样),则状态会被设为 false,暂时停止事件流动但不会停止数据的生成。 在这个状态下,为 'data' 事件绑定监听器不会使状态切换到 true。
4. 定制化Stream
Stream中的pipe方法犹如管道一样,让数据可以连续通过不同的流处理,比如
const fs = require('fs') var rStream = fs.createReadStream(file) var wStream = fs.createWriteStream(renamefile) rStream.pipe(wStream) 复制代码
在具体的开发中,常常需要对数据进行处理,我们需要重写模块中的方式
用例 | 类 | 方法 |
只读流 | Readable | _read |
只写流 | Writable | _write _writev _final |
可读可写 | Duplex | _read _write _writev _final |
读入处理,写入 | Transform | _transform _flush _final |
写法有很多种,以Writable为例
// prototype继承 var stream = require('stream') var util = require('util') function MyStream () { stream.Writable.call(this) } util.inherits(MyStream, stream.Writable) MyStream.prototype._write = function (chunk, encoding, callback) { console.log(chunk.toString()) callback() } var myStream = new MyStream() process.stdin.pipe(myStream) 复制代码
// 使用实例 var stream = require('stream') var myStream = new stream.Writable() myStream._write = function (chunk, encoding, callback) { console.log(chunk.toString()) callback() } process.stdin.pipe(myStream) 复制代码
// 使用Constructor API var myStream = new stream.Writable({ _write: function(chunk, encoding, callback) { console.log(chunk.toString()) callback() } }) 复制代码
// ES6类写法, Node 4+ class MyStream extends stream.Writable { _write(chunk, enc, callback) { console.log(chunk.toString()) callback() } } var myStream = new MyStream() 复制代码
以下以ES6的写法为主
Writable
const { Writable } = require('stream') class OutStream extends Writable { constructor(option) { super() this.encode = option.encode } _write(chunk, enc = this.encode, next) { console.log(chunk.toString()) next && next() } } const outStream = new OutStream({ encode: 'utf-8', }) process.stdin.pipe(outStream) 复制代码
Readable
const { Readable } = require('stream') class InStream extends Readable { constructor() { super() } _read(size) { // size就是highWaterMark值 } } const inStream = new InStream() inStream.push('ABCDEFG') // push读入 inStream.push('HIJKLMN') inStream.push(null) // null表示已经无数据 inStream.pipe(process.stdout) 复制代码
可以重写 _read
class InStream extends Readable { constructor() { super() } // _read会持续触发 _read(size) { this.push(this.num++) if(this.num > 20) { this.push(null) // 当num > 20,push(null)结束读入 } } } const inStream = new InStream() inStream.num = 0 inStream.pipe(process.stdout) 复制代码
Duplex
const { Duplex } = require('stream') class IoStream extends Duplex { constructor(option) { super() let op = option || {} this.encode = option.encode } // 同时重写 _write _read _write(chunk, enc, next) { console.log(chunk.toString()) next && next() } _read(size) { this.push(this.num++) if(this.num > 20) { this.push(null) } } } const iostream = new IoStream({ encode: 'utf-8', }) iostream.num = 0 process.stdin.pipe(iostream).pipe(process.stdout) 复制代码
Transform
const { Transform } = require('stream') class MyTransform extends Transform { constructor() { super() } // 重写 _transform 读入处理后写入 _transform(chunk, enc, next) { this.push(chunk.toString().toUpperCase()) next && next() } } const mytransfrom = new MyTransform() process.stdin.pipe(mytransfrom).pipe(process.stdout) 复制代码
5. objectMode
有时我们需要处理的不仅仅是字符串,还包括特殊数据。
比如,如果有一个需要和C++服务器通信或互通信息的Node.js服务器,通过一个cache存储二进制数据,要求存入的是C++ struct结构数据。这时候,需要通过Node.js的Object处理实现同样结构数据。而同时又需要流处理的话,则需要使用到Stream的objectMode。
通过Transform进行简单介绍
const { Transform } = require('stream') class CreateArray extends Transform { constructor() { // 开启objectMode模式 super({ readableObjectMode: true, writableObjectMode: true }) } _transform(chunk, enc, next) { // 可以直接push Object数据 this.push(chunk.toString().trim().split(',')) next && next() } } class ObjToString extends Transform { constructor() { super({ readableObjectMode: true, writableObjectMode: true }) } _transform(chunk, enc, next) { // chunk可以是Object数据 this.push(JSON.stringify(chunk)) next && next() } } const createArray = new CreateArray() const objtostring = new ObjToString() process.stdin.pipe(createArray).pipe(objtostring).pipe(process.stdout) 复制代码
6. 小结
Stream是Node.js中很重要的模块,处理数据高效,在项目中需要更灵活的使用。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。