@FunC
2017-11-29T17:08:09.000000Z
字数 7257
阅读 2441
Node.js
先放一张思维导图(PDF版放大后还是勉强能看清的):
书中的介绍已经很详细,这里有篇不错的 stream 入门文章:
stream-handbook:https://github.com/jabez128/stream-handbook
如果还想要更多的实操例子,可以试试 nodeschool 的对应专题:
npm install -g stream-adventure
在事件驱动的 Node.js 中,最高效的处理 I/O 的模式,就是一有输入就对其进行处理并输出。
传统的是 buffer 模式,就是把整个文件都读入 buffer 后才进行处理:
而 stream 是将数据且分成一小个的 chunk,来一个处理一个;
Stream 的优点可以总结为空间效率高、时间效率高和组合性强。下面来简单介绍。
主要问题在于并发处理大文件。
如果是 buffer 模式,并发处理大文件时,占用内存极大(因为要全部读取完才能处理)。同时,V8 的 buffer 上限约为 1GB,所以使用 stream 更合理。
同样也是数据分成 chunk 带来的好处。观察下图的场景:
上图展示了一个:读取 -> 压缩 -> 发送至服务端 -> 服务端读取 -> 解压 -> 写入 的过程。
如果采用 stream,发挥了 Node.js 异步能力的同时,还让服务端能尽快接收,处理数据。
Stream 的高组合性得益于.pipe()
方法,提供了一个 stream 组合的通用接口。
.pipe()
除了让代码更易组合复用,还能让代码更清晰易懂且模块化。
回想一下,stream 顺序处理 chunk(遍历器),同时还能监听/触发事件( EventEmitter 的实例),这不正是我们先前处理异步流程所需的东西吗。
使用 stream 来做顺序执行任务,确保次序的同时还能保持优雅。
来看一段代码,重点见注释:
concatFiles.js
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');
function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);
fromArray.obj(files)
.pipe(through.obj((file, enc, done) => {
const src = fs.createReadStream(file);
// 每个 file 读取&写入完毕后不关闭 destStream,让下一个 file 继续
src.pipe(destStream, {end: false});
src.on('end', done);
}))
.on('finish', () => {
// through 生成的 transform stream 结束后,手动关闭 destStream
destStream.end();
callback();
});
}
module.exports = concatFiles;
思路与使用 callback 类似,callback 中是立即返回,stream 中是立即调用 _transform 的 done。同时将 _flush中的 done 保留至全部任务完成。
来看一段代码,重点见注释:
parallelStream.js
const stream = require('stream');
class ParallelStream extends stream.Transform {
constructor(userTransform) {
// 这种并发场景主要用在 object stream 上
super({objectMode: true});
// 即 ._transform() 留给调用时实现
this.userTransform = userTransform;
this.running = 0;
// 用于稍后保存 ._flush(done) 中的 done
this.terminateCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
done();
}
_flush(done) {
// 没运行完则不调用 done,保存到 this.terminateCallback中,以后用
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
// 该方法在每次完成一个 transform 中的 chunk 时都会触发
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
// 如果这是最后一个任务了,先前有保存 done 的话就执行,否则在 _flush 中 done
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
并行处理任务时不限制并发数将会导致严重的后果。参考保存 _flush 中的 done 函数来延后 finish 的做法,transform stream 中达到并发上限时,也可以延后 done,来控制并发。
来看一段代码,重点见注释:
limitedParallelStream.js
const stream = require('stream');
class LimitedParallelStream extends stream.Transform {
constructor(concurrency, userTransform) {
super({objectMode: true});
// 并发数
this.concurrency = concurrency;
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
// 与 terminateCallback 同理,不过保存的是 _transform 中的 done
this.continueCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
if(this.running < this.concurrency) {
done();
} else {
// 达到并发上限则将 done 延后到 _onComplete 中调用
this.continueCallback = done;
}
}
_flush(done) {
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
const tmpCallback = this.continueCallback;
// reset
this.continueCallback = null;
// continue ._transform()
tmpCallback && tmpCallback();
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = LimitedParallelStream;
实际上,我们只要能把无序输出的chunk重新正确排序,就相当于做到顺序并发执行了。可以通过 npm 上的 through2-parallel
包实现。
优点:
1. 复用组合管道(且不用考虑内部组合细节)
2. 简化错误处理(只用处理合并管道的错误)
组合管道的实质:
1. 写入时,实际写入的是组合管道的第一个 stream
2. 读取时,实际读取的事组合管道的最后一个 stream
可直接使用 multipipe 包
fork 一个 stream,意味着将一个 Readable stream 接到多个 Writable streams 上。
在 Node.js 中,直接重复使用该 readable stream 即可:
// 省略部分代码...
const inputStream = fs.createReadStream(inputFile);
inputStream
.pipe(sha1Stream)
.pipe(fs.createWriteStream(inputFile + '.sha1'))
;
inputStream
.pipe(md5Stream)
.pipe(fs.createWriteStream(inputFile + '.md5'))
;
值得注意的是,fork 的 stream 接收到的是相同的 chunks,所以要注意有副作用的操作。因为会影响到其他 fork 的stream。
另外,数据流动的速度=最慢的 fork stream 的速度。
合并操作是分支操作的逆向版:
由于管道默认的 auto end 设置,其中一个 source stream 结束后 destination stream 就会马上关闭,从而导致错误。
解决方案是设置{end: false}
,并在全部 source stream 完成后手动触发 destination 的end()
。
示例代码如下:
mergeTar.js
const tar = require('tar');
const fstream = require('fstream');
const path = require('path');
const destination = path.resolve(process.argv[2]);
const sourceA = path.resolve(process.argv[3]);
const sourceB = path.resolve(process.argv[4]);
// 还没有输入,可以先随意组合
const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));
// 计数器,在全部 source 都结束时调用 end()
let endCount = 0;
function onEnd() {
if(++endCount === 2) {
pack.end();
}
}
// 注意使用上面定义的 onEnd()回调
const sourceStreamA = fstream.Reader({type: "Directory", path: sourceA})
.on('end', onEnd);
const sourceStreamB = fstream.Reader({type: "Directory", path: sourceB})
.on('end', onEnd);
//
// 开始 pipe 时记得设置 end:false选项
sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});
注意,多个 source stream 的数据是随机顺序注入 destination stream 的,这在部分 object stream 中是可接受的。
如果想要按顺序(一个 source 接一个 source 地)消耗 chunk,可实现一个状态机来记录当前的 source,按顺序调用相应的 done。(npm packages:multistream)
如果我们想公用一个频道(channel),来传输多个独立的 streams(即传输完后可重新分离),则需要实现多路复用与分路,如下图所示:
可见,通过多路复用器(MUX)将多个 channel 通过一个共享的 channel 传输。然后通过分路器(DEMUX)将 channel 重新分离。这种技术经常用在通讯上。
实现多路复用的其中一种技术,就是分组交换(packet switching)。思路是将分路时所需的一些元信息(如 channel id,chunk length 等)和 chunk 打包在一起传输。
我们先实现一个只有 channel id, data length 和 data 的 mini packet:
来看一下 client.js 的代码(重点见注释):
const child_process = require('child_process');
const net = require('net');
function multiplexChannels(sources, destination) {
let totalChannels = sources.length;
for (let i = 0; i < sources.length; i++) {
sources[i]
.on('readable', function() {
let chunk;
while((chunk = this.read()) !== null) {
const outBuff = new Buffer(1 + 4 + chunk.length);
// 写入1字节的 channel id
outBuff.writeUInt8(i, 0);
// 写入4字节的 chunk size
outBuff.writeUInt32BE(chunk.length, 1);
// 写入 chunk 的内容
chunk.copy(outBuff, 5);
console.log('Sending packet tp channel: ' + i);
destination.write(outBuff);
}
})
.on('end', () => {
if (--totalChannels === 0) {
destination.end();
}
});
}
}
const socket = net.connect(3000, () => {
// silent: true 能让子进程不继承父进程的 stdout 和 stderr
const child = child_process.fork(process.argv[2],process.argv.slice(3),{silent:true});
multiplexChannels([child.stdout, child.stderr], socket);
});
下面来看一下 server.js 的代码(重点见注释):
const net = require('net');
const fs = require('fs');
function demultiplexChannel(source, destinations) {
// 用于记录当前 chunk 的 channel id
let currentChannel = null;
// 用于记录当前 chunk 的长度
let currentLength = null;
source
.on('readable', () => {
let chunk;
if (currentChannel === null) {
// 先读1个字节的 id 信息
chunk = source.read(1);
currentChannel = chunk && chunk.readUInt8(0);
}
if (currentLength === null) {
// 读4个字节的长度信息
chunk = source.read(4);
currentLenth = chunk && chunk.readUInt32BE(0);
// 有可能 buffer 中的数据不足,.read() return null
// 等一下次 readable 再读取
if (currentLength === null) {
return;
}
}
// 读取 chunk 的内容
chunk = source.read(currentLenth);
if (chunk === null) {
return;
}
console.log('Received packet from: ' + currentChannel);
// 写入目标 channel
destinations[currentChannel].write(chunk);
// reset
currentChannel = null;
currentLength = null;
})
.on('end', () => {
destinations.forEach(destination => destination.end());
console.log('Source channel closed');
});
}
net.createServer(socket => {
const stdoutStream = fs.createWriteStream('stdout.log');
const stderrStream = fs.createWriteStream('stderr.log');
demultiplexChannel(socket, [stdoutStream, stderrStream]);
}).listen(3000, () => console.log('Server started'));
原理和 binary mode 基本一样,且 channelID 可以直接作为对象的属性 。
另一种模式是根据一定的规则对流入的 data 进行分路: