[关闭]
@FunC 2017-11-29T17:08:09.000000Z 字数 7257 阅读 2441

Node.js Design Patterns | CH05

Node.js


来用 stream 编程吧!

先放一张思维导图(PDF版放大后还是勉强能看清的):


书中的介绍已经很详细,这里有篇不错的 stream 入门文章:
stream-handbook:https://github.com/jabez128/stream-handbook
如果还想要更多的实操例子,可以试试 nodeschool 的对应专题:
npm install -g stream-adventure

发现 stream 的重要性

在事件驱动的 Node.js 中,最高效的处理 I/O 的模式,就是一有输入就对其进行处理并输出。
传统的是 buffer 模式,就是把整个文件都读入 buffer 后才进行处理:

而 stream 是将数据且分成一小个的 chunk,来一个处理一个;

Stream 的优点可以总结为空间效率高、时间效率高和组合性强。下面来简单介绍。

空间效率

主要问题在于并发处理大文件。
如果是 buffer 模式,并发处理大文件时,占用内存极大(因为要全部读取完才能处理)。同时,V8 的 buffer 上限约为 1GB,所以使用 stream 更合理。

时间效率

同样也是数据分成 chunk 带来的好处。观察下图的场景:

上图展示了一个:读取 -> 压缩 -> 发送至服务端 -> 服务端读取 -> 解压 -> 写入 的过程。
如果采用 stream,发挥了 Node.js 异步能力的同时,还让服务端能尽快接收,处理数据。

组合性

Stream 的高组合性得益于.pipe()方法,提供了一个 stream 组合的通用接口。
.pipe()除了让代码更易组合复用,还能让代码更清晰易懂且模块化。

使用 stream 来处理异步流程

回想一下,stream 顺序处理 chunk(遍历器),同时还能监听/触发事件( EventEmitter 的实例),这不正是我们先前处理异步流程所需的东西吗。

1. 顺序执行

使用 stream 来做顺序执行任务,确保次序的同时还能保持优雅。
来看一段代码,重点见注释:
concatFiles.js

  1. const fromArray = require('from2-array');
  2. const through = require('through2');
  3. const fs = require('fs');
  4. function concatFiles(destination, files, callback) {
  5. const destStream = fs.createWriteStream(destination);
  6. fromArray.obj(files)
  7. .pipe(through.obj((file, enc, done) => {
  8. const src = fs.createReadStream(file);
  9. // 每个 file 读取&写入完毕后不关闭 destStream,让下一个 file 继续
  10. src.pipe(destStream, {end: false});
  11. src.on('end', done);
  12. }))
  13. .on('finish', () => {
  14. // through 生成的 transform stream 结束后,手动关闭 destStream
  15. destStream.end();
  16. callback();
  17. });
  18. }
  19. module.exports = concatFiles;

2. 无序并行执行

思路与使用 callback 类似,callback 中是立即返回,stream 中是立即调用 _transform 的 done。同时将 _flush中的 done 保留至全部任务完成。
来看一段代码,重点见注释:
parallelStream.js

  1. const stream = require('stream');
  2. class ParallelStream extends stream.Transform {
  3. constructor(userTransform) {
  4. // 这种并发场景主要用在 object stream 上
  5. super({objectMode: true});
  6. // 即 ._transform() 留给调用时实现
  7. this.userTransform = userTransform;
  8. this.running = 0;
  9. // 用于稍后保存 ._flush(done) 中的 done
  10. this.terminateCallback = null;
  11. }
  12. _transform(chunk, enc, done) {
  13. this.running++;
  14. this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
  15. done();
  16. }
  17. _flush(done) {
  18. // 没运行完则不调用 done,保存到 this.terminateCallback中,以后用
  19. if(this.running > 0) {
  20. this.terminateCallback = done;
  21. } else {
  22. done();
  23. }
  24. }
  25. // 该方法在每次完成一个 transform 中的 chunk 时都会触发
  26. _onComplete(err) {
  27. this.running--;
  28. if(err) {
  29. return this.emit('error', err);
  30. }
  31. // 如果这是最后一个任务了,先前有保存 done 的话就执行,否则在 _flush 中 done
  32. if(this.running === 0) {
  33. this.terminateCallback && this.terminateCallback();
  34. }
  35. }
  36. }

3. 无序并行执行(限制并发数)

并行处理任务时不限制并发数将会导致严重的后果。参考保存 _flush 中的 done 函数来延后 finish 的做法,transform stream 中达到并发上限时,也可以延后 done,来控制并发。
来看一段代码,重点见注释:
limitedParallelStream.js

  1. const stream = require('stream');
  2. class LimitedParallelStream extends stream.Transform {
  3. constructor(concurrency, userTransform) {
  4. super({objectMode: true});
  5. // 并发数
  6. this.concurrency = concurrency;
  7. this.userTransform = userTransform;
  8. this.running = 0;
  9. this.terminateCallback = null;
  10. // 与 terminateCallback 同理,不过保存的是 _transform 中的 done
  11. this.continueCallback = null;
  12. }
  13. _transform(chunk, enc, done) {
  14. this.running++;
  15. this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
  16. if(this.running < this.concurrency) {
  17. done();
  18. } else {
  19. // 达到并发上限则将 done 延后到 _onComplete 中调用
  20. this.continueCallback = done;
  21. }
  22. }
  23. _flush(done) {
  24. if(this.running > 0) {
  25. this.terminateCallback = done;
  26. } else {
  27. done();
  28. }
  29. }
  30. _onComplete(err) {
  31. this.running--;
  32. if(err) {
  33. return this.emit('error', err);
  34. }
  35. const tmpCallback = this.continueCallback
  36. // reset
  37. this.continueCallback = null;
  38. // continue ._transform()
  39. tmpCallback && tmpCallback();
  40. if(this.running === 0) {
  41. this.terminateCallback && this.terminateCallback();
  42. }
  43. }
  44. }
  45. module.exports = LimitedParallelStream;

4. 顺序并发执行

实际上,我们只要能把无序输出的chunk重新正确排序,就相当于做到顺序并发执行了。可以通过 npm 上的 through2-parallel包实现。

管道相关的模式(Piping patterns)

1. 组合(Combining streams)

优点:
1. 复用组合管道(且不用考虑内部组合细节)
2. 简化错误处理(只用处理合并管道的错误)

组合管道的实质:
1. 写入时,实际写入的是组合管道的第一个 stream
2. 读取时,实际读取的事组合管道的最后一个 stream

可直接使用 multipipe 包

2.分支(Forking streams)


fork 一个 stream,意味着将一个 Readable stream 接到多个 Writable streams 上。
在 Node.js 中,直接重复使用该 readable stream 即可:

  1. // 省略部分代码...
  2. const inputStream = fs.createReadStream(inputFile);
  3. inputStream
  4. .pipe(sha1Stream)
  5. .pipe(fs.createWriteStream(inputFile + '.sha1'))
  6. ;
  7. inputStream
  8. .pipe(md5Stream)
  9. .pipe(fs.createWriteStream(inputFile + '.md5'))
  10. ;

值得注意的是,fork 的 stream 接收到的是相同的 chunks,所以要注意有副作用的操作。因为会影响到其他 fork 的stream。
另外,数据流动的速度=最慢的 fork stream 的速度。

3.合并(Merging streams)

合并操作是分支操作的逆向版:

由于管道默认的 auto end 设置,其中一个 source stream 结束后 destination stream 就会马上关闭,从而导致错误。
解决方案是设置{end: false},并在全部 source stream 完成后手动触发 destination 的end()
示例代码如下:
mergeTar.js

  1. const tar = require('tar');
  2. const fstream = require('fstream');
  3. const path = require('path');
  4. const destination = path.resolve(process.argv[2]);
  5. const sourceA = path.resolve(process.argv[3]);
  6. const sourceB = path.resolve(process.argv[4]);
  7. // 还没有输入,可以先随意组合
  8. const pack = tar.Pack();
  9. pack.pipe(fstream.Writer(destination));
  10. // 计数器,在全部 source 都结束时调用 end()
  11. let endCount = 0;
  12. function onEnd() {
  13. if(++endCount === 2) {
  14. pack.end();
  15. }
  16. }
  17. // 注意使用上面定义的 onEnd()回调
  18. const sourceStreamA = fstream.Reader({type: "Directory", path: sourceA})
  19. .on('end', onEnd);
  20. const sourceStreamB = fstream.Reader({type: "Directory", path: sourceB})
  21. .on('end', onEnd);
  22. //
  23. // 开始 pipe 时记得设置 end:false选项
  24. sourceStreamA.pipe(pack, {end: false});
  25. sourceStreamB.pipe(pack, {end: false});

注意,多个 source stream 的数据是随机顺序注入 destination stream 的,这在部分 object stream 中是可接受的。
如果想要按顺序(一个 source 接一个 source 地)消耗 chunk,可实现一个状态机来记录当前的 source,按顺序调用相应的 done。(npm packages:multistream)

多路复用与分路(multiplexing and demultiplexing)

如果我们想公用一个频道(channel),来传输多个独立的 streams(即传输完后可重新分离),则需要实现多路复用与分路,如下图所示:


可见,通过多路复用器(MUX)将多个 channel 通过一个共享的 channel 传输。然后通过分路器(DEMUX)将 channel 重新分离。这种技术经常用在通讯上。

实现多路复用的其中一种技术,就是分组交换(packet switching)。思路是将分路时所需的一些元信息(如 channel id,chunk length 等)和 chunk 打包在一起传输。

实现一个远程日志记录服务

我们先实现一个只有 channel id, data length 和 data 的 mini packet:

来看一下 client.js 的代码(重点见注释):

  1. const child_process = require('child_process');
  2. const net = require('net');
  3. function multiplexChannels(sources, destination) {
  4. let totalChannels = sources.length;
  5. for (let i = 0; i < sources.length; i++) {
  6. sources[i]
  7. .on('readable', function() {
  8. let chunk;
  9. while((chunk = this.read()) !== null) {
  10. const outBuff = new Buffer(1 + 4 + chunk.length);
  11. // 写入1字节的 channel id
  12. outBuff.writeUInt8(i, 0);
  13. // 写入4字节的 chunk size
  14. outBuff.writeUInt32BE(chunk.length, 1);
  15. // 写入 chunk 的内容
  16. chunk.copy(outBuff, 5);
  17. console.log('Sending packet tp channel: ' + i);
  18. destination.write(outBuff);
  19. }
  20. })
  21. .on('end', () => {
  22. if (--totalChannels === 0) {
  23. destination.end();
  24. }
  25. });
  26. }
  27. }
  28. const socket = net.connect(3000, () => {
  29. // silent: true 能让子进程不继承父进程的 stdout 和 stderr
  30. const child = child_process.fork(process.argv[2],process.argv.slice(3),{silent:true});
  31. multiplexChannels([child.stdout, child.stderr], socket);
  32. });

下面来看一下 server.js 的代码(重点见注释):

  1. const net = require('net');
  2. const fs = require('fs');
  3. function demultiplexChannel(source, destinations) {
  4. // 用于记录当前 chunk 的 channel id
  5. let currentChannel = null;
  6. // 用于记录当前 chunk 的长度
  7. let currentLength = null;
  8. source
  9. .on('readable', () => {
  10. let chunk;
  11. if (currentChannel === null) {
  12. // 先读1个字节的 id 信息
  13. chunk = source.read(1);
  14. currentChannel = chunk && chunk.readUInt8(0);
  15. }
  16. if (currentLength === null) {
  17. // 读4个字节的长度信息
  18. chunk = source.read(4);
  19. currentLenth = chunk && chunk.readUInt32BE(0);
  20. // 有可能 buffer 中的数据不足,.read() return null
  21. // 等一下次 readable 再读取
  22. if (currentLength === null) {
  23. return;
  24. }
  25. }
  26. // 读取 chunk 的内容
  27. chunk = source.read(currentLenth);
  28. if (chunk === null) {
  29. return;
  30. }
  31. console.log('Received packet from: ' + currentChannel);
  32. // 写入目标 channel
  33. destinations[currentChannel].write(chunk);
  34. // reset
  35. currentChannel = null;
  36. currentLength = null;
  37. })
  38. .on('end', () => {
  39. destinations.forEach(destination => destination.end());
  40. console.log('Source channel closed');
  41. });
  42. }
  43. net.createServer(socket => {
  44. const stdoutStream = fs.createWriteStream('stdout.log');
  45. const stderrStream = fs.createWriteStream('stderr.log');
  46. demultiplexChannel(socket, [stdoutStream, stderrStream]);
  47. }).listen(3000, () => console.log('Server started'));

object stream 的多路复用与分路

原理和 binary mode 基本一样,且 channelID 可以直接作为对象的属性 。

只进行分路操作

另一种模式是根据一定的规则对流入的 data 进行分路:

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注