[关闭]
@FunC 2017-11-21T21:57:34.000000Z 字数 12341 阅读 1983

Node.js Design Patterns | CH09

Node.js


进阶版异步“食谱“(Recipes)

到目前为止,我们所看到的设计模式基本能解决绝大部分的问题。而有这样一些模式,它们更加的专注于解决某些特定问题,这些模式被称为“食谱”(recipes,暂找不到更好的翻译)。就如同现实生活中,照着食谱一步步地做,就能完成一道菜。照着这些“食谱”做,就能解决一类问题(当然也可以有些创造性的改变,但核心点是一样的)

本章的“食谱”主要涉及:
1. 引入一个需要异步初始化的模块
2. 对异步操作进行批量处理于缓存,以实现在繁忙应用中的性能提升。
3. 避免同步的计算密集型(CPU-bound)操作会阻塞 event loop 并削弱 Node.js 处理并发的能力

1. 引入异步初始化模块

我们在第二章中了解到,require函数是同步的。然而有时候有些必须异步的操作,例如在初始化时需要用到网络(等待握手建立连接,获取配置参数等)。这种场景在数据库中很常见

权威解法(以连接数据库为例)

调用数据库模块时,需要等待连接成功才能进行后续操作。通常我们有以下两种选择:

  1. const db = require('aDb'); // The async module
  2. module.exports = function findAll(type, callback) {
  3. if (db.connected) {
  4. runFild();
  5. } else {
  6. db.once('connected', runFind);
  7. }
  8. function runFind() {
  9. db.findAll(type, callback);
  10. }
  11. };

显然,这种方式看着很讨厌,因为多了大量的样板(boilerplate)代码,繁琐啰嗦。

  1. // in module app.js
  2. const db = require('aDb');
  3. const findAllFactory = require('./findAll');
  4. db.on('connected', function() {
  5. const findAll = findAllFactory(db); // 通过工厂函数 + 初始化完成的实例 = 可用的api
  6. // use findAll
  7. });
  8. // in module findAll.js
  9. module.exports = db => {
  10. return function findAll(type, callback) {
  11. db.findAll(type, callback);
  12. }
  13. };

这种模式有的时候也不太适用。因为在大型项目中,复杂度会急剧提高,特别是总需要手动初始化异步模块时。当然,如果使用一些 DI 容器等工具有助于减少痛苦。
下面介绍第三种方式,它能轻易将模块从其依赖的初始化状态中解耦。

预初始化队列

简单来说,思路就是:将未完成初始化时的操作存起来,等初始化一完成就执行。

(原书中部分代码有误,以下代码均已修正并通过测试)
我们首先准备一个需要异步初始化的模块:
asyncModule.js

  1. const asyncModule = module.exports;
  2. asyncModule.inited = false;
  3. asyncModule.init = callback => {
  4. setTimeout(function() {
  5. asyncModule.inited = true;
  6. callback();
  7. }, 5000);
  8. };
  9. asyncModule.tellMeSomething = callback => {
  10. process.nextTick(() => {
  11. if (!asyncModule.inited) {
  12. return callback(
  13. new Error("I don't have anything to say right now")
  14. );
  15. }
  16. console.log(callback);
  17. callback(null, `Current time is ${new Date()}`);
  18. })
  19. };

routes.js:

  1. const asyncModule = require('./asyncModule.js');
  2. // const asyncModule = require('./asyncModuleWrapper.js');
  3. module.exports.say = (req, res) => {
  4. asyncModule.tellMeSomething((err, something) => {
  5. if (err) {
  6. res.writeHead(500);
  7. return res.end(`Error: ${err.message}`);
  8. }
  9. res.writeHead(200);
  10. res.end(`I say: ${something}`);
  11. });
  12. };

app.js

  1. const http = require('http');
  2. const routes = require('./routes.js');
  3. // const asyncModule = require('./asyncModule.js');
  4. // const asyncModule = require('./asyncModuleWrapper.js');
  5. asyncModule.init(() => {
  6. console.log('Async Module is inited');
  7. });
  8. http.createServer((req, res) => {
  9. if (req.method === 'GET' && req.url === '/say') {
  10. return routes.say(req, res);
  11. }
  12. res.writeHead(404);
  13. res.end('Not Found');
  14. }).listen(8000, () => console.log('Started'));

简单来说,这个app如果在启动服务5秒内访问,会提示“I don’t have anything to say right now.”;5秒后,初始化完成(通过setTimeout模拟),才会返回当前时间。

使用预初始化队列包装模块

通常,我们不能改动异步模块中的内容。所以,为了添加我们的队列,我们需要在原模块的基础上加上一层代理:
asyncModuleWrapper.js

  1. const asyncModule = require('./asyncModule');
  2. const asyncModuleWrapper = module.exports;
  3. // 这里只是简单的代理原模块的方法,并通过 activeState 变量切换当前的状态
  4. asyncModuleWrapper.inited = false;
  5. asyncModuleWrapper.init = function() {
  6. activeState.init.apply(activeState, arguments);
  7. };
  8. asyncModuleWrapper.tellMeSomething = function() {
  9. activeState.tellMeSomething.apply(activeState, arguments);
  10. };
  11. // 这个是当初始化未完成时积累的操作队列
  12. let pending = [];
  13. // 这个是未初始化完成版的模块
  14. const notInitedState = {
  15. init: function(callback) {
  16. asyncModule.init(() => {
  17. asyncModuleWrapper.inited = true;
  18. // activeState 切换成原模块
  19. activeState = initedState;
  20. // 执行积攒下来的操作
  21. pending.forEach(req => {
  22. asyncModule[req.method].apply(null, req.args);
  23. });
  24. // 清空队列
  25. pending = [];
  26. callback();
  27. });
  28. },
  29. // 原模块中的同名方法,改为将此任务推入队列
  30. tellMeSomething: function() {
  31. return pending.push({
  32. method: 'tellMeSomething',
  33. args: arguments
  34. })
  35. }
  36. };
  37. // 初始化完成时的模块自然就是原本的模块
  38. let initedState = asyncModule;
  39. // 设置初始值
  40. let activeState = notInitedState;

该模式的核心在于缓存操作与切换模式。在现实生活中,著名的 ORM 库 Mongoose 就使用了这种模式。

异步操作的批量处理与缓存

在高负载应用中,缓存扮演者及其重要的角色,它几乎被应用在 web 的任何地方。

先实现一个没有缓存和批处理的服务器

该服务器用于记录商品的数量,以transactionId {amount, item}的形式组织
totalSales.js:

  1. const level = require('level');
  2. const sublevel = require('level-sublevel');
  3. const db = sublevel(level('example-db', {valueEncoding: 'json'}));
  4. const salesDb = db.sublevel('sales');
  5. module.exports = function totalSales(item, callback) {
  6. console.log('totalSales() invoked');
  7. let sum = 0;
  8. salesDb.createValueStream()
  9. .on('data', data => {
  10. // item 为空时也可触发,用于读取数量
  11. if (!item || date.item === item) {
  12. sum += data.amount;
  13. }
  14. })
  15. .on('end', () => {
  16. callback(null, sum);
  17. });
  18. };

app.js:

  1. const http = require('http');
  2. const url = require('url');
  3. const totalSales = require('./totalSales');
  4. http.createServer((req, res) => {
  5. const query = url.parse(req.url, true).query;
  6. totalSales(query.item, (err, sum) => {
  7. res.writeHead(200);
  8. res.end(`Total sales for item ${query.item} is ${sum}`);
  9. });
  10. }).listen(8000, () => console.log('Started'));

接下来便可以通过访问http://localhost:8000?item=book来查询数据了

批处理异步请求

在处理异步操作时,最基本的缓存可以通过对同一个 API 请求进行批处理来实现。
通常我们的请求是这样的:

[image:945AF9E7-A424-486F-887E-7B5A8511466C-2547-00003961E914AC88/9B882BA1-8776-4312-9292-8E59A98B1C17.png]

每个请求单独发起一个异步操作,独自完成后各自返回。
考虑这样一个场景:当两个客户端用同样的输入触发了一个一模一样的异步操作。
这样,后一个请求便不用亲自重新发起异步操作,只需要等待前一个操作完成,取其结果即可:

[image:414AAB0D-4A66-4EBC-B5CF-D2FFB09C919F-2547-000039A1FC329C61/544DCA66-33D6-44DB-8885-924DC109174A.png]

这种方式相当简单,但又极其有效。而且不需要处理复杂的缓存机理(这通常需要更多的内存管理与准备相应的失效策略)

缓存异步请求

显然,对于批处理模式,如果 API 的响应速度越快,越少的操作能被批处理。然而即使 API 响应快,它仍会消耗一定资源,如果累积起来仍有可能对应用造成冲击。
有时候,我们可以假设一个 API 的调用结果是不常改变的,于是我们可以通过缓存(cache)的方式来提高性能。
注意,即使使用了缓存,最好还是同时加上批处理(batch)。因为如果还没有缓存时就收到了多个并发请求,那么缓存就会被设置多次。
同时使用批处理和缓存后的流程如下图所示:

[image:0914195F-C96B-48A9-897E-9F560EFCE0CB-4625-000008DEC1CB391F/C2B43865-BE8C-4E74-8C8D-CF8FAF5AD034.png]

上图展示了一个优化后的异步缓存算法:
* 没有缓存时,应用批处理模式
* 有缓存时,异步返回缓存(保持一致性)

缓存版web server

对上述例子应用缓存模式
totalSalesCache.js:

  1. const totalSales = require('./totalSales.js');
  2. const queues = {};
  3. // 用于缓存结果
  4. const cache = {};
  5. module.exports = function totalSalesCache(item, callback) {
  6. if (cache[item]) {
  7. console.log('Cache hit');
  8. // cache[item] is only a value, use .bind() to curry it as a function
  9. // 注意要异步返回, 此处使用 process.nextTick 来实现
  10. return process.nextTick(callback.bind(null, null, cache[item]));
  11. }
  12. if (queues[item]) {
  13. console.log('Batching operation');
  14. return queues[item].push(callback);
  15. }
  16. queues[item] = [callback];
  17. totalSales(item, (err, res) => {
  18. if (!err) {
  19. cache[item] = res;
  20. // clear cache every 30s
  21. setInterval(() => {
  22. delete cache[item];
  23. }, 30 * 1000);
  24. }
  25. // for batching
  26. const queue = queues[item];
  27. queues[item] = null;
  28. queue.forEach(cb => cb(null, res));
  29. });
  30. };

Memoization 是缓存函数调用结果的实现。在 npm 中有一些包也实现了该功能,如memoizee

关于缓存机制的不同实现

使用 promises 来进行缓存和批处理

我们再来回看一下 promise 的一些特性:
* 多个then()方法可以链式附在同一个 promise 上(对比我们的批处理队列)
* then()方法最多只会触发一次(还记得为什么使用缓存时仍要进行批处理吗?)
* 附在已经 resolve 的 promise 后面的then()方法仍会被调用(相当于后面的 promise 都在使用缓存的结果)
* then()方法总是异步触发(不需要手动 process.nextTick()来进行异步返回了)

可见,promis 非常适合用于缓存和批处理。话不多说,我们马上用 promise 把上面的 server 重新实现一遍:
totalSalesPromises.js:

  1. // pify 用于对 callback-base API 进行 promisify
  2. const pify = require('pify');
  3. const totalSales = pify(require('./totalSales.js'));
  4. const cache = {};
  5. module.exports = function totalSalesPromise(item) {
  6. if (cache[item]) {
  7. // 因为 cache[item] 是 promise,直接 return 便是异步
  8. return cache[item];
  9. }
  10. // cache[item] is a promise here
  11. // promise 自带批处理特性
  12. cache[item] = totalSales(item)
  13. .then((res) => {
  14. setTimeout(() => { delete cache[item]}, 30 * 1000);
  15. return res;
  16. })
  17. .catch(err => {
  18. delete cache[item];
  19. throw err;
  20. });
  21. return cache[item];
  22. };

调用时也更加简单
appPromise.js:

  1. const http = require('http');
  2. const url = require('url');
  3. const totalSales = require('./totalSalesPromise');
  4. http.createServer((req, res) => {
  5. const query = url.parse(req.url, true).query;
  6. totalSales(query.item).then(sum => {
  7. res.writeHead(200);
  8. res.end(`Total sales for item ${query.item} is ${sum}`);
  9. });
  10. }).listen(8000, () => console.log('Started'));

就是这么轻松写意。

运行计算密集型(CPU-bound)任务

跟之前一样,我们需要一个计算密集型的例子。这里使用经典的子集和(subset sum)问题,问在给定集合中,有无子集的和等于指定的值。该问题的复杂度为O(2^n),十分适合。

首先先给出同步的解法:
subsetSum.js

  1. const EventEmitter = require('events').EventEmitter;
  2. // 继承自 EventEmitter,能通过事件对中间步骤进行处理
  3. class SubsetSum extends EventEmitter {
  4. constructor(sum, set) {
  5. super();
  6. this.sum = sum;
  7. this.set = set;
  8. this.totalSubsets = 0;
  9. }
  10. _combine(set, subset) {
  11. for (let i = 0; i < set.length; i++) {
  12. let newSubset = subset.concat(set[i]);
  13. this._combine(set.slice(i + 1), newSubset);
  14. this._processSubset(newSubset);
  15. }
  16. }
  17. _processSubset(subset) {
  18. console.log('Subset', ++this.totalSubsets, subset);
  19. const res = subset.reduce((prev, item) => (prev + item), 0);
  20. if (res == this.sum) {
  21. // 每找到一个 emit 一个 match 事件
  22. this.emit('match', subset);
  23. }
  24. }
  25. start() {
  26. this._combine(this.set, []);
  27. // 全部计算完毕后 emit end 事件
  28. this.emit('end');
  29. }
  30. }

编写一个简单的服务器来运行它
app.js :

  1. const http = require('http');
  2. const SubsetSum = require('./subsetSum');
  3. //const SubsetSum = require('./subsetSumDefer');
  4. //const SubsetSum = require('./subsetSumFork');
  5. http.createServer((req, res) => {
  6. const url = require('url').parse(req.url, true);
  7. if (url.pathname === '/subsetSum') {
  8. const data = JSON.parse(url.query.data);
  9. res.writeHead(200);
  10. // 在这里执行计算
  11. const subsetSum = new SubsetSum(url.query.sum, data);
  12. subsetSum.on('match', match => {
  13. res.write('Match: ' + JSON.stringify(match) + '\n');
  14. });
  15. subsetSum.on('end', () => res.end());
  16. subsetSum.start();
  17. } else {
  18. res.writeHead(200);
  19. // 直接访问时返回 “I am alive!"
  20. res.end("I am alive!\n");
  21. }
  22. }).listen(8000, () => console.log('Started'));

如果尝试直接运行,结果要在计算完成后才会输出。尝试直接访问也不会立刻返回I am alive!,要等全部计算完毕后才会响应。
原因就是因为 Node.js 时单线程的,同步计算阻塞了 event loop,导致其他的回调函数无法执行。

通过 setImmediate 插入操作

出现上述问题的原因在于计算是同步,连续进行的。如果每次计算后都将下一步的计算放到下一个 event loop 的 I/O 操作之后,那么其他的回调就有时间执行了。
参考 Node.js 的 event loop 示意图:

[image:3D18C926-A57F-4EB5-9759-70572FD8D931-4625-0000154A5E3FEC3C/D33F9289-4E30-447C-98D5-DDFA041B639A.png]

由上图可知,我们的 I/O 操作被阻塞了,所以需要把每一步计算放到 I/O之后。显然,可以通过setImmediate() API 实现(放在setImmediate()中的操作会延后到上图中的 check 阶段执行(在I/O之后)。

交织(interleaving)后的 子集和 算法

  1. const EventEmitter = require('events').EventEmitter;
  2. class SubsetSum extends EventEmitter {
  3. constructor(sum, set) {
  4. super();
  5. this.sum = sum;
  6. this.set = set;
  7. this.totalSubsets = 0;
  8. this.runningCombine;
  9. }
  10. _combine(set, subset) {
  11. for (let i = 0; i < set.length; i++) {
  12. let newSubset = subset.concat(set[i]);
  13. // change _combine to _combineInterleaved
  14. this._combineInterleaved(set.slice(i + 1), newSubset);
  15. this._processSubset(newSubset);
  16. }
  17. }
  18. _combineInterleaved(set, subset) {
  19. this.runningCombine++;
  20. setImmediate(() => {
  21. this._combine(set, subset);
  22. if (--this.runningCombine === 0) {
  23. this.emit('end');
  24. }
  25. });
  26. }
  27. _processSubset(subset) {
  28. console.log('Subset', ++this.totalSubsets, subset);
  29. const res = subset.reduce((prev, item) => (prev + item), 0);
  30. if (res == this.sum) {
  31. this.emit('match', subset);
  32. }
  33. }
  34. start() {
  35. this.runningCombine = 0;
  36. this._combineInterleaved(this.set, []);
  37. }
  38. }
  39. module.exports = SubsetSum;

这时再次运行,就会发现每隔一段时间就会返回一部分结果,而不是等待全部计算完毕后再返回了。

进一步思考交织模式(interleaving pattern)

尽管使用setImmediate()简单又有效,但这并不是最好的模式。事实上,将任务延后会带来一点点开销。当整个算法的所有步骤的额外开销积攒起来就很可怕了。尤其对于计算密集型任务,我们更希望能尽快地返回结果给用户。其中一个优化方式是:隔几步才用一次setImmediate(),而不是每一步都用。
在繁忙的服务器中,一个任务即使阻塞了200ms都是不太能接受的。只有对于断断续续,非常时间运行的任务,用setImmediate()才比较合适。

使用多进程

让应用保持响应的最佳方式,是不要让昂贵的计算密集型任务运行在应用的主进程上。使用子进程有以下好处:
* 同步的计算能够全速运行,而不必交织其他的步骤
* 比用setImmediate()更简单,而且不用改原文件
* 如果需要更高的计算性能,其它进程可以通过一些低级语言来创建(如C)
* Node.js 自带的chil_process模块,在新建子进程时还能建立通讯通道

先实现一个进程池

因为新建进程需要额外的开销和时间,我们可以创建一个进程池。一方面能复用进程,减少新建进程时的开销。另一方面可以限制进程数,避免DoS攻击。
processPool.js:

  1. const fork = require('child_process').fork;
  2. class ProcessPool {
  3. constructor(file, poolMax) {
  4. this.file = file;
  5. this.poolMax = poolMax;
  6. // processes ready to be used
  7. this.pool = [];
  8. // processes using
  9. this.active = [];
  10. // a queue of callback, because of the lack of available process
  11. this.waiting = [];
  12. }
  13. acquire(callback) {
  14. let worker;
  15. // 有空闲进程时,激活
  16. if (this.pool.length > 0) {
  17. worker = this.pool.pop();
  18. this.active.push(worker);
  19. return process.nextTick(callback.bind(null, null, worker));
  20. }
  21. // 没有空闲进程则推入等待队列
  22. if (this.active.length >= this.poolMax) {
  23. return this.waiting.push(callback);
  24. }
  25. // 根据文件建立相应的进程
  26. worker = fork(this.file);
  27. this.active.push(worker);
  28. process.nextTick(callback.bind(null, null, worker));
  29. }
  30. release(worker) {
  31. if (this.waiting.length > 0) {
  32. const waitingCallback = this.waiting.shift();
  33. waitingCallback(null, worker);
  34. }
  35. this.active = this.active.filter(w => worker !== worker);
  36. this.pool.push(worker);
  37. }
  38. }
  39. module.exports = ProcessPool;

如果想减少常驻内存使用和增加健壮性,可以:
1. 在一段时间闲置后,可将闲置进程终结
2. 将无响应和崩溃掉的进程杀掉重启

与子进程通信

实现一个模块,能通知子进程工作,并从它那里拿到结果
subsetSumFork.js:

  1. const EventEmitter = require('events').EventEmitter;
  2. const ProcessPool = require('./processPoll');
  3. // 用目标计算进程模块新建进程池
  4. const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
  5. class SubsetSumFork extends EventEmitter {
  6. constructor(sum, set) {
  7. super();
  8. this.sum = sum;
  9. this.set = set;
  10. }
  11. start() {
  12. workers.acquire((err, worker) => {
  13. // 通知子进程进行计算
  14. worker.send({sum: this.sum, set: this.set});
  15. const onMessage = msg => {
  16. if (msg.event === 'end') {
  17. worker.removeListener('message', onMessage);
  18. workers.release(worker);
  19. }
  20. // 向外部发送 end 事件和结果
  21. this.emit(msg.event, msg.data);
  22. };
  23. // 监听从子进程收到的信息
  24. worker.on('message', onMessage);
  25. });
  26. }
  27. }
  28. module.exports = SubsetSumFork;

与父进程通信

来实现 subsetSumWorker.js , 用于执行密集型计算
subsetWorker.js:

  1. const SubsetSum = require('./subsetSum');
  2. process.on('message', msg => {
  3. // 直接调用原本的subsetSum即可,原文件无需改动!
  4. const subsetSum = new SubsetSum(msg.sum, msg.set);
  5. subsetSum.on('match', data => {
  6. process.send({event: 'match', data: data});
  7. });
  8. subsetSum.on('end', data => {
  9. process.send({event: 'end', data: data});
  10. });
  11. subsetSum.start();
  12. });

这时,再去对服务器进行请求,将能马上得到响应。

显然,这种模式比交织模式更强大更灵活,但仍有上限。上限在于这单台机器的硬件。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注