@FunC
2017-11-29T09:08:09.000000Z
字数 7257
阅读 2844
Node.js
先放一张思维导图(PDF版放大后还是勉强能看清的):
书中的介绍已经很详细,这里有篇不错的 stream 入门文章:
stream-handbook:https://github.com/jabez128/stream-handbook
如果还想要更多的实操例子,可以试试 nodeschool 的对应专题:
npm install -g stream-adventure
在事件驱动的 Node.js 中,最高效的处理 I/O 的模式,就是一有输入就对其进行处理并输出。
传统的是 buffer 模式,就是把整个文件都读入 buffer 后才进行处理:

而 stream 是将数据且分成一小个的 chunk,来一个处理一个;

Stream 的优点可以总结为空间效率高、时间效率高和组合性强。下面来简单介绍。
主要问题在于并发处理大文件。
如果是 buffer 模式,并发处理大文件时,占用内存极大(因为要全部读取完才能处理)。同时,V8 的 buffer 上限约为 1GB,所以使用 stream 更合理。
同样也是数据分成 chunk 带来的好处。观察下图的场景:
上图展示了一个:读取 -> 压缩 -> 发送至服务端 -> 服务端读取 -> 解压 -> 写入 的过程。
如果采用 stream,发挥了 Node.js 异步能力的同时,还让服务端能尽快接收,处理数据。
Stream 的高组合性得益于.pipe()方法,提供了一个 stream 组合的通用接口。
.pipe()除了让代码更易组合复用,还能让代码更清晰易懂且模块化。
回想一下,stream 顺序处理 chunk(遍历器),同时还能监听/触发事件( EventEmitter 的实例),这不正是我们先前处理异步流程所需的东西吗。
使用 stream 来做顺序执行任务,确保次序的同时还能保持优雅。
来看一段代码,重点见注释:
concatFiles.js
const fromArray = require('from2-array');const through = require('through2');const fs = require('fs');function concatFiles(destination, files, callback) {const destStream = fs.createWriteStream(destination);fromArray.obj(files).pipe(through.obj((file, enc, done) => {const src = fs.createReadStream(file);// 每个 file 读取&写入完毕后不关闭 destStream,让下一个 file 继续src.pipe(destStream, {end: false});src.on('end', done);})).on('finish', () => {// through 生成的 transform stream 结束后,手动关闭 destStreamdestStream.end();callback();});}module.exports = concatFiles;
思路与使用 callback 类似,callback 中是立即返回,stream 中是立即调用 _transform 的 done。同时将 _flush中的 done 保留至全部任务完成。
来看一段代码,重点见注释:
parallelStream.js
const stream = require('stream');class ParallelStream extends stream.Transform {constructor(userTransform) {// 这种并发场景主要用在 object stream 上super({objectMode: true});// 即 ._transform() 留给调用时实现this.userTransform = userTransform;this.running = 0;// 用于稍后保存 ._flush(done) 中的 donethis.terminateCallback = null;}_transform(chunk, enc, done) {this.running++;this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));done();}_flush(done) {// 没运行完则不调用 done,保存到 this.terminateCallback中,以后用if(this.running > 0) {this.terminateCallback = done;} else {done();}}// 该方法在每次完成一个 transform 中的 chunk 时都会触发_onComplete(err) {this.running--;if(err) {return this.emit('error', err);}// 如果这是最后一个任务了,先前有保存 done 的话就执行,否则在 _flush 中 doneif(this.running === 0) {this.terminateCallback && this.terminateCallback();}}}
并行处理任务时不限制并发数将会导致严重的后果。参考保存 _flush 中的 done 函数来延后 finish 的做法,transform stream 中达到并发上限时,也可以延后 done,来控制并发。
来看一段代码,重点见注释:
limitedParallelStream.js
const stream = require('stream');class LimitedParallelStream extends stream.Transform {constructor(concurrency, userTransform) {super({objectMode: true});// 并发数this.concurrency = concurrency;this.userTransform = userTransform;this.running = 0;this.terminateCallback = null;// 与 terminateCallback 同理,不过保存的是 _transform 中的 donethis.continueCallback = null;}_transform(chunk, enc, done) {this.running++;this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));if(this.running < this.concurrency) {done();} else {// 达到并发上限则将 done 延后到 _onComplete 中调用this.continueCallback = done;}}_flush(done) {if(this.running > 0) {this.terminateCallback = done;} else {done();}}_onComplete(err) {this.running--;if(err) {return this.emit('error', err);}const tmpCallback = this.continueCallback;// resetthis.continueCallback = null;// continue ._transform()tmpCallback && tmpCallback();if(this.running === 0) {this.terminateCallback && this.terminateCallback();}}}module.exports = LimitedParallelStream;
实际上,我们只要能把无序输出的chunk重新正确排序,就相当于做到顺序并发执行了。可以通过 npm 上的 through2-parallel包实现。

优点:
1. 复用组合管道(且不用考虑内部组合细节)
2. 简化错误处理(只用处理合并管道的错误)
组合管道的实质:
1. 写入时,实际写入的是组合管道的第一个 stream
2. 读取时,实际读取的事组合管道的最后一个 stream
可直接使用 multipipe 包
fork 一个 stream,意味着将一个 Readable stream 接到多个 Writable streams 上。
在 Node.js 中,直接重复使用该 readable stream 即可:
// 省略部分代码...const inputStream = fs.createReadStream(inputFile);inputStream.pipe(sha1Stream).pipe(fs.createWriteStream(inputFile + '.sha1'));inputStream.pipe(md5Stream).pipe(fs.createWriteStream(inputFile + '.md5'));
值得注意的是,fork 的 stream 接收到的是相同的 chunks,所以要注意有副作用的操作。因为会影响到其他 fork 的stream。
另外,数据流动的速度=最慢的 fork stream 的速度。
合并操作是分支操作的逆向版:

由于管道默认的 auto end 设置,其中一个 source stream 结束后 destination stream 就会马上关闭,从而导致错误。
解决方案是设置{end: false},并在全部 source stream 完成后手动触发 destination 的end()。
示例代码如下:
mergeTar.js
const tar = require('tar');const fstream = require('fstream');const path = require('path');const destination = path.resolve(process.argv[2]);const sourceA = path.resolve(process.argv[3]);const sourceB = path.resolve(process.argv[4]);// 还没有输入,可以先随意组合const pack = tar.Pack();pack.pipe(fstream.Writer(destination));// 计数器,在全部 source 都结束时调用 end()let endCount = 0;function onEnd() {if(++endCount === 2) {pack.end();}}// 注意使用上面定义的 onEnd()回调const sourceStreamA = fstream.Reader({type: "Directory", path: sourceA}).on('end', onEnd);const sourceStreamB = fstream.Reader({type: "Directory", path: sourceB}).on('end', onEnd);//// 开始 pipe 时记得设置 end:false选项sourceStreamA.pipe(pack, {end: false});sourceStreamB.pipe(pack, {end: false});
注意,多个 source stream 的数据是随机顺序注入 destination stream 的,这在部分 object stream 中是可接受的。
如果想要按顺序(一个 source 接一个 source 地)消耗 chunk,可实现一个状态机来记录当前的 source,按顺序调用相应的 done。(npm packages:multistream)
如果我们想公用一个频道(channel),来传输多个独立的 streams(即传输完后可重新分离),则需要实现多路复用与分路,如下图所示:
可见,通过多路复用器(MUX)将多个 channel 通过一个共享的 channel 传输。然后通过分路器(DEMUX)将 channel 重新分离。这种技术经常用在通讯上。
实现多路复用的其中一种技术,就是分组交换(packet switching)。思路是将分路时所需的一些元信息(如 channel id,chunk length 等)和 chunk 打包在一起传输。
我们先实现一个只有 channel id, data length 和 data 的 mini packet:

来看一下 client.js 的代码(重点见注释):
const child_process = require('child_process');const net = require('net');function multiplexChannels(sources, destination) {let totalChannels = sources.length;for (let i = 0; i < sources.length; i++) {sources[i].on('readable', function() {let chunk;while((chunk = this.read()) !== null) {const outBuff = new Buffer(1 + 4 + chunk.length);// 写入1字节的 channel idoutBuff.writeUInt8(i, 0);// 写入4字节的 chunk sizeoutBuff.writeUInt32BE(chunk.length, 1);// 写入 chunk 的内容chunk.copy(outBuff, 5);console.log('Sending packet tp channel: ' + i);destination.write(outBuff);}}).on('end', () => {if (--totalChannels === 0) {destination.end();}});}}const socket = net.connect(3000, () => {// silent: true 能让子进程不继承父进程的 stdout 和 stderrconst child = child_process.fork(process.argv[2],process.argv.slice(3),{silent:true});multiplexChannels([child.stdout, child.stderr], socket);});
下面来看一下 server.js 的代码(重点见注释):
const net = require('net');const fs = require('fs');function demultiplexChannel(source, destinations) {// 用于记录当前 chunk 的 channel idlet currentChannel = null;// 用于记录当前 chunk 的长度let currentLength = null;source.on('readable', () => {let chunk;if (currentChannel === null) {// 先读1个字节的 id 信息chunk = source.read(1);currentChannel = chunk && chunk.readUInt8(0);}if (currentLength === null) {// 读4个字节的长度信息chunk = source.read(4);currentLenth = chunk && chunk.readUInt32BE(0);// 有可能 buffer 中的数据不足,.read() return null// 等一下次 readable 再读取if (currentLength === null) {return;}}// 读取 chunk 的内容chunk = source.read(currentLenth);if (chunk === null) {return;}console.log('Received packet from: ' + currentChannel);// 写入目标 channeldestinations[currentChannel].write(chunk);// resetcurrentChannel = null;currentLength = null;}).on('end', () => {destinations.forEach(destination => destination.end());console.log('Source channel closed');});}net.createServer(socket => {const stdoutStream = fs.createWriteStream('stdout.log');const stderrStream = fs.createWriteStream('stderr.log');demultiplexChannel(socket, [stdoutStream, stderrStream]);}).listen(3000, () => console.log('Server started'));
原理和 binary mode 基本一样,且 channelID 可以直接作为对象的属性 。
另一种模式是根据一定的规则对流入的 data 进行分路:
