@FunC
2017-11-21T13:57:34.000000Z
字数 12341
阅读 2351
Node.js
到目前为止,我们所看到的设计模式基本能解决绝大部分的问题。而有这样一些模式,它们更加的专注于解决某些特定问题,这些模式被称为“食谱”(recipes,暂找不到更好的翻译)。就如同现实生活中,照着食谱一步步地做,就能完成一道菜。照着这些“食谱”做,就能解决一类问题(当然也可以有些创造性的改变,但核心点是一样的)
本章的“食谱”主要涉及:
1. 引入一个需要异步初始化的模块
2. 对异步操作进行批量处理于缓存,以实现在繁忙应用中的性能提升。
3. 避免同步的计算密集型(CPU-bound)操作会阻塞 event loop 并削弱 Node.js 处理并发的能力
我们在第二章中了解到,require函数是同步的。然而有时候有些必须异步的操作,例如在初始化时需要用到网络(等待握手建立连接,获取配置参数等)。这种场景在数据库中很常见
调用数据库模块时,需要等待连接成功才能进行后续操作。通常我们有以下两种选择:
const db = require('aDb'); // The async modulemodule.exports = function findAll(type, callback) {if (db.connected) {runFild();} else {db.once('connected', runFind);}function runFind() {db.findAll(type, callback);}};
显然,这种方式看着很讨厌,因为多了大量的样板(boilerplate)代码,繁琐啰嗦。
// in module app.jsconst db = require('aDb');const findAllFactory = require('./findAll');db.on('connected', function() {const findAll = findAllFactory(db); // 通过工厂函数 + 初始化完成的实例 = 可用的api// use findAll});// in module findAll.jsmodule.exports = db => {return function findAll(type, callback) {db.findAll(type, callback);}};
这种模式有的时候也不太适用。因为在大型项目中,复杂度会急剧提高,特别是总需要手动初始化异步模块时。当然,如果使用一些 DI 容器等工具有助于减少痛苦。
下面介绍第三种方式,它能轻易将模块从其依赖的初始化状态中解耦。
简单来说,思路就是:将未完成初始化时的操作存起来,等初始化一完成就执行。
(原书中部分代码有误,以下代码均已修正并通过测试)
我们首先准备一个需要异步初始化的模块:
asyncModule.js
const asyncModule = module.exports;asyncModule.inited = false;asyncModule.init = callback => {setTimeout(function() {asyncModule.inited = true;callback();}, 5000);};asyncModule.tellMeSomething = callback => {process.nextTick(() => {if (!asyncModule.inited) {return callback(new Error("I don't have anything to say right now"));}console.log(callback);callback(null, `Current time is ${new Date()}`);})};
routes.js:
const asyncModule = require('./asyncModule.js');// const asyncModule = require('./asyncModuleWrapper.js');module.exports.say = (req, res) => {asyncModule.tellMeSomething((err, something) => {if (err) {res.writeHead(500);return res.end(`Error: ${err.message}`);}res.writeHead(200);res.end(`I say: ${something}`);});};
app.js
const http = require('http');const routes = require('./routes.js');// const asyncModule = require('./asyncModule.js');// const asyncModule = require('./asyncModuleWrapper.js');asyncModule.init(() => {console.log('Async Module is inited');});http.createServer((req, res) => {if (req.method === 'GET' && req.url === '/say') {return routes.say(req, res);}res.writeHead(404);res.end('Not Found');}).listen(8000, () => console.log('Started'));
简单来说,这个app如果在启动服务5秒内访问,会提示“I don’t have anything to say right now.”;5秒后,初始化完成(通过setTimeout模拟),才会返回当前时间。
通常,我们不能改动异步模块中的内容。所以,为了添加我们的队列,我们需要在原模块的基础上加上一层代理:
asyncModuleWrapper.js
const asyncModule = require('./asyncModule');const asyncModuleWrapper = module.exports;// 这里只是简单的代理原模块的方法,并通过 activeState 变量切换当前的状态asyncModuleWrapper.inited = false;asyncModuleWrapper.init = function() {activeState.init.apply(activeState, arguments);};asyncModuleWrapper.tellMeSomething = function() {activeState.tellMeSomething.apply(activeState, arguments);};// 这个是当初始化未完成时积累的操作队列let pending = [];// 这个是未初始化完成版的模块const notInitedState = {init: function(callback) {asyncModule.init(() => {asyncModuleWrapper.inited = true;// activeState 切换成原模块activeState = initedState;// 执行积攒下来的操作pending.forEach(req => {asyncModule[req.method].apply(null, req.args);});// 清空队列pending = [];callback();});},// 原模块中的同名方法,改为将此任务推入队列tellMeSomething: function() {return pending.push({method: 'tellMeSomething',args: arguments})}};// 初始化完成时的模块自然就是原本的模块let initedState = asyncModule;// 设置初始值let activeState = notInitedState;
该模式的核心在于缓存操作与切换模式。在现实生活中,著名的 ORM 库 Mongoose 就使用了这种模式。
在高负载应用中,缓存扮演者及其重要的角色,它几乎被应用在 web 的任何地方。
该服务器用于记录商品的数量,以transactionId {amount, item}的形式组织
totalSales.js:
const level = require('level');const sublevel = require('level-sublevel');const db = sublevel(level('example-db', {valueEncoding: 'json'}));const salesDb = db.sublevel('sales');module.exports = function totalSales(item, callback) {console.log('totalSales() invoked');let sum = 0;salesDb.createValueStream().on('data', data => {// item 为空时也可触发,用于读取数量if (!item || date.item === item) {sum += data.amount;}}).on('end', () => {callback(null, sum);});};
app.js:
const http = require('http');const url = require('url');const totalSales = require('./totalSales');http.createServer((req, res) => {const query = url.parse(req.url, true).query;totalSales(query.item, (err, sum) => {res.writeHead(200);res.end(`Total sales for item ${query.item} is ${sum}`);});}).listen(8000, () => console.log('Started'));
接下来便可以通过访问http://localhost:8000?item=book来查询数据了
在处理异步操作时,最基本的缓存可以通过对同一个 API 请求进行批处理来实现。
通常我们的请求是这样的:
[image:945AF9E7-A424-486F-887E-7B5A8511466C-2547-00003961E914AC88/9B882BA1-8776-4312-9292-8E59A98B1C17.png]
每个请求单独发起一个异步操作,独自完成后各自返回。
考虑这样一个场景:当两个客户端用同样的输入触发了一个一模一样的异步操作。
这样,后一个请求便不用亲自重新发起异步操作,只需要等待前一个操作完成,取其结果即可:
[image:414AAB0D-4A66-4EBC-B5CF-D2FFB09C919F-2547-000039A1FC329C61/544DCA66-33D6-44DB-8885-924DC109174A.png]
这种方式相当简单,但又极其有效。而且不需要处理复杂的缓存机理(这通常需要更多的内存管理与准备相应的失效策略)
显然,对于批处理模式,如果 API 的响应速度越快,越少的操作能被批处理。然而即使 API 响应快,它仍会消耗一定资源,如果累积起来仍有可能对应用造成冲击。
有时候,我们可以假设一个 API 的调用结果是不常改变的,于是我们可以通过缓存(cache)的方式来提高性能。
注意,即使使用了缓存,最好还是同时加上批处理(batch)。因为如果还没有缓存时就收到了多个并发请求,那么缓存就会被设置多次。
同时使用批处理和缓存后的流程如下图所示:
[image:0914195F-C96B-48A9-897E-9F560EFCE0CB-4625-000008DEC1CB391F/C2B43865-BE8C-4E74-8C8D-CF8FAF5AD034.png]
上图展示了一个优化后的异步缓存算法:
* 没有缓存时,应用批处理模式
* 有缓存时,异步返回缓存(保持一致性)
对上述例子应用缓存模式
totalSalesCache.js:
const totalSales = require('./totalSales.js');const queues = {};// 用于缓存结果const cache = {};module.exports = function totalSalesCache(item, callback) {if (cache[item]) {console.log('Cache hit');// cache[item] is only a value, use .bind() to curry it as a function// 注意要异步返回, 此处使用 process.nextTick 来实现return process.nextTick(callback.bind(null, null, cache[item]));}if (queues[item]) {console.log('Batching operation');return queues[item].push(callback);}queues[item] = [callback];totalSales(item, (err, res) => {if (!err) {cache[item] = res;// clear cache every 30ssetInterval(() => {delete cache[item];}, 30 * 1000);}// for batchingconst queue = queues[item];queues[item] = null;queue.forEach(cb => cb(null, res));});};
Memoization 是缓存函数调用结果的实现。在 npm 中有一些包也实现了该功能,如memoizee
我们再来回看一下 promise 的一些特性:
* 多个then()方法可以链式附在同一个 promise 上(对比我们的批处理队列)
* then()方法最多只会触发一次(还记得为什么使用缓存时仍要进行批处理吗?)
* 附在已经 resolve 的 promise 后面的then()方法仍会被调用(相当于后面的 promise 都在使用缓存的结果)
* then()方法总是异步触发(不需要手动 process.nextTick()来进行异步返回了)
可见,promis 非常适合用于缓存和批处理。话不多说,我们马上用 promise 把上面的 server 重新实现一遍:
totalSalesPromises.js:
// pify 用于对 callback-base API 进行 promisifyconst pify = require('pify');const totalSales = pify(require('./totalSales.js'));const cache = {};module.exports = function totalSalesPromise(item) {if (cache[item]) {// 因为 cache[item] 是 promise,直接 return 便是异步return cache[item];}// cache[item] is a promise here// promise 自带批处理特性cache[item] = totalSales(item).then((res) => {setTimeout(() => { delete cache[item]}, 30 * 1000);return res;}).catch(err => {delete cache[item];throw err;});return cache[item];};
调用时也更加简单
appPromise.js:
const http = require('http');const url = require('url');const totalSales = require('./totalSalesPromise');http.createServer((req, res) => {const query = url.parse(req.url, true).query;totalSales(query.item).then(sum => {res.writeHead(200);res.end(`Total sales for item ${query.item} is ${sum}`);});}).listen(8000, () => console.log('Started'));
就是这么轻松写意。
跟之前一样,我们需要一个计算密集型的例子。这里使用经典的子集和(subset sum)问题,问在给定集合中,有无子集的和等于指定的值。该问题的复杂度为O(2^n),十分适合。
首先先给出同步的解法:
subsetSum.js
const EventEmitter = require('events').EventEmitter;// 继承自 EventEmitter,能通过事件对中间步骤进行处理class SubsetSum extends EventEmitter {constructor(sum, set) {super();this.sum = sum;this.set = set;this.totalSubsets = 0;}_combine(set, subset) {for (let i = 0; i < set.length; i++) {let newSubset = subset.concat(set[i]);this._combine(set.slice(i + 1), newSubset);this._processSubset(newSubset);}}_processSubset(subset) {console.log('Subset', ++this.totalSubsets, subset);const res = subset.reduce((prev, item) => (prev + item), 0);if (res == this.sum) {// 每找到一个 emit 一个 match 事件this.emit('match', subset);}}start() {this._combine(this.set, []);// 全部计算完毕后 emit end 事件this.emit('end');}}
编写一个简单的服务器来运行它
app.js :
const http = require('http');const SubsetSum = require('./subsetSum');//const SubsetSum = require('./subsetSumDefer');//const SubsetSum = require('./subsetSumFork');http.createServer((req, res) => {const url = require('url').parse(req.url, true);if (url.pathname === '/subsetSum') {const data = JSON.parse(url.query.data);res.writeHead(200);// 在这里执行计算const subsetSum = new SubsetSum(url.query.sum, data);subsetSum.on('match', match => {res.write('Match: ' + JSON.stringify(match) + '\n');});subsetSum.on('end', () => res.end());subsetSum.start();} else {res.writeHead(200);// 直接访问时返回 “I am alive!"res.end("I am alive!\n");}}).listen(8000, () => console.log('Started'));
如果尝试直接运行,结果要在计算完成后才会输出。尝试直接访问也不会立刻返回I am alive!,要等全部计算完毕后才会响应。
原因就是因为 Node.js 时单线程的,同步计算阻塞了 event loop,导致其他的回调函数无法执行。
出现上述问题的原因在于计算是同步,连续进行的。如果每次计算后都将下一步的计算放到下一个 event loop 的 I/O 操作之后,那么其他的回调就有时间执行了。
参考 Node.js 的 event loop 示意图:
[image:3D18C926-A57F-4EB5-9759-70572FD8D931-4625-0000154A5E3FEC3C/D33F9289-4E30-447C-98D5-DDFA041B639A.png]
由上图可知,我们的 I/O 操作被阻塞了,所以需要把每一步计算放到 I/O之后。显然,可以通过setImmediate() API 实现(放在setImmediate()中的操作会延后到上图中的 check 阶段执行(在I/O之后)。
const EventEmitter = require('events').EventEmitter;class SubsetSum extends EventEmitter {constructor(sum, set) {super();this.sum = sum;this.set = set;this.totalSubsets = 0;this.runningCombine;}_combine(set, subset) {for (let i = 0; i < set.length; i++) {let newSubset = subset.concat(set[i]);// change _combine to _combineInterleavedthis._combineInterleaved(set.slice(i + 1), newSubset);this._processSubset(newSubset);}}_combineInterleaved(set, subset) {this.runningCombine++;setImmediate(() => {this._combine(set, subset);if (--this.runningCombine === 0) {this.emit('end');}});}_processSubset(subset) {console.log('Subset', ++this.totalSubsets, subset);const res = subset.reduce((prev, item) => (prev + item), 0);if (res == this.sum) {this.emit('match', subset);}}start() {this.runningCombine = 0;this._combineInterleaved(this.set, []);}}module.exports = SubsetSum;
这时再次运行,就会发现每隔一段时间就会返回一部分结果,而不是等待全部计算完毕后再返回了。
尽管使用setImmediate()简单又有效,但这并不是最好的模式。事实上,将任务延后会带来一点点开销。当整个算法的所有步骤的额外开销积攒起来就很可怕了。尤其对于计算密集型任务,我们更希望能尽快地返回结果给用户。其中一个优化方式是:隔几步才用一次setImmediate(),而不是每一步都用。
在繁忙的服务器中,一个任务即使阻塞了200ms都是不太能接受的。只有对于断断续续,非常时间运行的任务,用setImmediate()才比较合适。
让应用保持响应的最佳方式,是不要让昂贵的计算密集型任务运行在应用的主进程上。使用子进程有以下好处:
* 同步的计算能够全速运行,而不必交织其他的步骤
* 比用setImmediate()更简单,而且不用改原文件
* 如果需要更高的计算性能,其它进程可以通过一些低级语言来创建(如C)
* Node.js 自带的chil_process模块,在新建子进程时还能建立通讯通道
因为新建进程需要额外的开销和时间,我们可以创建一个进程池。一方面能复用进程,减少新建进程时的开销。另一方面可以限制进程数,避免DoS攻击。
processPool.js:
const fork = require('child_process').fork;class ProcessPool {constructor(file, poolMax) {this.file = file;this.poolMax = poolMax;// processes ready to be usedthis.pool = [];// processes usingthis.active = [];// a queue of callback, because of the lack of available processthis.waiting = [];}acquire(callback) {let worker;// 有空闲进程时,激活if (this.pool.length > 0) {worker = this.pool.pop();this.active.push(worker);return process.nextTick(callback.bind(null, null, worker));}// 没有空闲进程则推入等待队列if (this.active.length >= this.poolMax) {return this.waiting.push(callback);}// 根据文件建立相应的进程worker = fork(this.file);this.active.push(worker);process.nextTick(callback.bind(null, null, worker));}release(worker) {if (this.waiting.length > 0) {const waitingCallback = this.waiting.shift();waitingCallback(null, worker);}this.active = this.active.filter(w => worker !== worker);this.pool.push(worker);}}module.exports = ProcessPool;
如果想减少常驻内存使用和增加健壮性,可以:
1. 在一段时间闲置后,可将闲置进程终结
2. 将无响应和崩溃掉的进程杀掉重启
实现一个模块,能通知子进程工作,并从它那里拿到结果
subsetSumFork.js:
const EventEmitter = require('events').EventEmitter;const ProcessPool = require('./processPoll');// 用目标计算进程模块新建进程池const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);class SubsetSumFork extends EventEmitter {constructor(sum, set) {super();this.sum = sum;this.set = set;}start() {workers.acquire((err, worker) => {// 通知子进程进行计算worker.send({sum: this.sum, set: this.set});const onMessage = msg => {if (msg.event === 'end') {worker.removeListener('message', onMessage);workers.release(worker);}// 向外部发送 end 事件和结果this.emit(msg.event, msg.data);};// 监听从子进程收到的信息worker.on('message', onMessage);});}}module.exports = SubsetSumFork;
来实现 subsetSumWorker.js , 用于执行密集型计算
subsetWorker.js:
const SubsetSum = require('./subsetSum');process.on('message', msg => {// 直接调用原本的subsetSum即可,原文件无需改动!const subsetSum = new SubsetSum(msg.sum, msg.set);subsetSum.on('match', data => {process.send({event: 'match', data: data});});subsetSum.on('end', data => {process.send({event: 'end', data: data});});subsetSum.start();});
这时,再去对服务器进行请求,将能马上得到响应。
显然,这种模式比交织模式更强大更灵活,但仍有上限。上限在于这单台机器的硬件。