node 中的 stream

栏目: 服务器 · 发布时间: 5年前

内容简介:Stream 借鉴自 Unix 编程哲学中的 pipe。Unix shell 命令中觉的管道流式操作一个 stream 的运用场景。从服务器读取文件并返回给页面。

什么是 stream

Stream 借鉴自 Unix 编程哲学中的 pipe。

Unix shell 命令中觉的管道流式操作 | 将上一个命令的输出作为下一个命令的输入。node stream 中则是通过 .pip() 方法来进行的。

一个 stream 的运用场景。从服务器读取文件并返回给页面。

  • 朴素的实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);
  • stream 实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

好处:

  • 代码更加简洁
  • 可自由组合各种模块处理数据

stream 的种类

分五种:

  • readable
  • writable
  • duplex
  • transform
  • classic

readable

readable 类型的流产生数据,可通过 .pip() 输送到能够消费流数据的地方,比如 writable,transform,duplex

一个 readable stream 示例:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

运行结果:

$ node read0.js
beep boop

_read 方法与按需输出

上面 rs.push(null) 表示没有更多数据了。

上面从代码直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费。因为消费者有可能并不能立即消费这些内容,直接 push 数据后消耗不必要的资源。

更好的做法是,让 readable 流只在消费者需要数据的时候再 push 。这是通过定义能 raedable 对象定义 ._read 方法来完成的。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

运行结果:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

这种方式下,定义了 readable 流产生数据的方法 ._read ,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。

_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然, _read 方法的实现中是可以忽略这个入参的。

下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);
    
    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

输出任意数据

上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数, Readable({ objectMode: true })

消费 readable 流产生的数据

这一段没看太懂

writable 流

writable 流可作为 .pip() 的对象。

src.pipe(writableStream)

创建 writable 流

需要实现 ._write(chunk, enc, next) 方法,其中:

  • chunk 为接收到的数据
  • encopts.decodeStringfalse 且收到的数据这字符串时,它表示字符串的编码
  • next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败

默认情况下,获取到的字符串数据会转为 Buffer ,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。

一个 writable 示例:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

向 writable 流写入数据

通过调用 writable 流的 write 方法来写入。

process.stdout.write('beep boop\n');

通过调用 end() 来结束数据的写入。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

duplex

双工类型的流,同时具有 writable 和 readable 流的功能。node 内建的 zlib , TCP sockets 以及 crypto 都是双工类型的。

所以可对双工类型的流进行如下操作:

a.pip(b).pip(a)

transform

一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam

classic stream

这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。

classic readable stream

当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。

.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。

classic readable 流的创建

一个 classic readable 流的创建示例:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

从 classic readable 流读取数据

数据读取是通过监听流上的 dataend 事件。

一个从 classic readable 流读取数据的示例:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});

一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 throughconcat-stream 来进行。

classic writable stream

只需要实现 .write(buf).end(buf) 及  .destroy() 方法即可,比较简单。

内建的流对象

node 中的 stream

总结

本质上,所有流都是 EventEmitter ,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

嵌入式Linux应用开发完全手册

嵌入式Linux应用开发完全手册

韦东山 主编 / 人民邮电出版社 / 2008-8 / 69.00元

本书全部实例代码及相关工具。 基于ARM 9+Linux 206平台,从基础讲起,引导读者快速入门,实例丰富,可直接应用于工程实践。 本书全面介绍了嵌入式Linux系统开发过程中,从底层系统支持到上层GUI应用的方方面面,内容涵盖Linux操作系统的安装及相关工具的使用、配置,嵌入式编程所需要的基础知识(交叉编译工具的选项设置、Makefile语法、ARM汇编指令等),硬件部件的使用及......一起来看看 《嵌入式Linux应用开发完全手册》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具