@FunC
2017-11-21T21:57:34.000000Z
字数 12341
阅读 1983
Node.js
到目前为止,我们所看到的设计模式基本能解决绝大部分的问题。而有这样一些模式,它们更加的专注于解决某些特定问题,这些模式被称为“食谱”(recipes,暂找不到更好的翻译)。就如同现实生活中,照着食谱一步步地做,就能完成一道菜。照着这些“食谱”做,就能解决一类问题(当然也可以有些创造性的改变,但核心点是一样的)
本章的“食谱”主要涉及:
1. 引入一个需要异步初始化的模块
2. 对异步操作进行批量处理于缓存,以实现在繁忙应用中的性能提升。
3. 避免同步的计算密集型(CPU-bound)操作会阻塞 event loop 并削弱 Node.js 处理并发的能力
我们在第二章中了解到,require
函数是同步的。然而有时候有些必须异步的操作,例如在初始化时需要用到网络(等待握手建立连接,获取配置参数等)。这种场景在数据库中很常见
调用数据库模块时,需要等待连接成功才能进行后续操作。通常我们有以下两种选择:
const db = require('aDb'); // The async module
module.exports = function findAll(type, callback) {
if (db.connected) {
runFild();
} else {
db.once('connected', runFind);
}
function runFind() {
db.findAll(type, callback);
}
};
显然,这种方式看着很讨厌,因为多了大量的样板(boilerplate)代码,繁琐啰嗦。
// in module app.js
const db = require('aDb');
const findAllFactory = require('./findAll');
db.on('connected', function() {
const findAll = findAllFactory(db); // 通过工厂函数 + 初始化完成的实例 = 可用的api
// use findAll
});
// in module findAll.js
module.exports = db => {
return function findAll(type, callback) {
db.findAll(type, callback);
}
};
这种模式有的时候也不太适用。因为在大型项目中,复杂度会急剧提高,特别是总需要手动初始化异步模块时。当然,如果使用一些 DI 容器等工具有助于减少痛苦。
下面介绍第三种方式,它能轻易将模块从其依赖的初始化状态中解耦。
简单来说,思路就是:将未完成初始化时的操作存起来,等初始化一完成就执行。
(原书中部分代码有误,以下代码均已修正并通过测试)
我们首先准备一个需要异步初始化的模块:
asyncModule.js
const asyncModule = module.exports;
asyncModule.inited = false;
asyncModule.init = callback => {
setTimeout(function() {
asyncModule.inited = true;
callback();
}, 5000);
};
asyncModule.tellMeSomething = callback => {
process.nextTick(() => {
if (!asyncModule.inited) {
return callback(
new Error("I don't have anything to say right now")
);
}
console.log(callback);
callback(null, `Current time is ${new Date()}`);
})
};
routes.js:
const asyncModule = require('./asyncModule.js');
// const asyncModule = require('./asyncModuleWrapper.js');
module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if (err) {
res.writeHead(500);
return res.end(`Error: ${err.message}`);
}
res.writeHead(200);
res.end(`I say: ${something}`);
});
};
app.js
const http = require('http');
const routes = require('./routes.js');
// const asyncModule = require('./asyncModule.js');
// const asyncModule = require('./asyncModuleWrapper.js');
asyncModule.init(() => {
console.log('Async Module is inited');
});
http.createServer((req, res) => {
if (req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not Found');
}).listen(8000, () => console.log('Started'));
简单来说,这个app如果在启动服务5秒内访问,会提示“I don’t have anything to say right now.”;5秒后,初始化完成(通过setTimeout模拟),才会返回当前时间。
通常,我们不能改动异步模块中的内容。所以,为了添加我们的队列,我们需要在原模块的基础上加上一层代理:
asyncModuleWrapper.js
const asyncModule = require('./asyncModule');
const asyncModuleWrapper = module.exports;
// 这里只是简单的代理原模块的方法,并通过 activeState 变量切换当前的状态
asyncModuleWrapper.inited = false;
asyncModuleWrapper.init = function() {
activeState.init.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = function() {
activeState.tellMeSomething.apply(activeState, arguments);
};
// 这个是当初始化未完成时积累的操作队列
let pending = [];
// 这个是未初始化完成版的模块
const notInitedState = {
init: function(callback) {
asyncModule.init(() => {
asyncModuleWrapper.inited = true;
// activeState 切换成原模块
activeState = initedState;
// 执行积攒下来的操作
pending.forEach(req => {
asyncModule[req.method].apply(null, req.args);
});
// 清空队列
pending = [];
callback();
});
},
// 原模块中的同名方法,改为将此任务推入队列
tellMeSomething: function() {
return pending.push({
method: 'tellMeSomething',
args: arguments
})
}
};
// 初始化完成时的模块自然就是原本的模块
let initedState = asyncModule;
// 设置初始值
let activeState = notInitedState;
该模式的核心在于缓存操作与切换模式。在现实生活中,著名的 ORM 库 Mongoose 就使用了这种模式。
在高负载应用中,缓存扮演者及其重要的角色,它几乎被应用在 web 的任何地方。
该服务器用于记录商品的数量,以transactionId {amount, item}
的形式组织
totalSales.js:
const level = require('level');
const sublevel = require('level-sublevel');
const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');
module.exports = function totalSales(item, callback) {
console.log('totalSales() invoked');
let sum = 0;
salesDb.createValueStream()
.on('data', data => {
// item 为空时也可触发,用于读取数量
if (!item || date.item === item) {
sum += data.amount;
}
})
.on('end', () => {
callback(null, sum);
});
};
app.js:
const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
http.createServer((req, res) => {
const query = url.parse(req.url, true).query;
totalSales(query.item, (err, sum) => {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, () => console.log('Started'));
接下来便可以通过访问http://localhost:8000?item=book
来查询数据了
在处理异步操作时,最基本的缓存可以通过对同一个 API 请求进行批处理来实现。
通常我们的请求是这样的:
[image:945AF9E7-A424-486F-887E-7B5A8511466C-2547-00003961E914AC88/9B882BA1-8776-4312-9292-8E59A98B1C17.png]
每个请求单独发起一个异步操作,独自完成后各自返回。
考虑这样一个场景:当两个客户端用同样的输入触发了一个一模一样的异步操作。
这样,后一个请求便不用亲自重新发起异步操作,只需要等待前一个操作完成,取其结果即可:
[image:414AAB0D-4A66-4EBC-B5CF-D2FFB09C919F-2547-000039A1FC329C61/544DCA66-33D6-44DB-8885-924DC109174A.png]
这种方式相当简单,但又极其有效。而且不需要处理复杂的缓存机理(这通常需要更多的内存管理与准备相应的失效策略)
显然,对于批处理模式,如果 API 的响应速度越快,越少的操作能被批处理。然而即使 API 响应快,它仍会消耗一定资源,如果累积起来仍有可能对应用造成冲击。
有时候,我们可以假设一个 API 的调用结果是不常改变的,于是我们可以通过缓存(cache)的方式来提高性能。
注意,即使使用了缓存,最好还是同时加上批处理(batch)。因为如果还没有缓存时就收到了多个并发请求,那么缓存就会被设置多次。
同时使用批处理和缓存后的流程如下图所示:
[image:0914195F-C96B-48A9-897E-9F560EFCE0CB-4625-000008DEC1CB391F/C2B43865-BE8C-4E74-8C8D-CF8FAF5AD034.png]
上图展示了一个优化后的异步缓存算法:
* 没有缓存时,应用批处理模式
* 有缓存时,异步返回缓存(保持一致性)
对上述例子应用缓存模式
totalSalesCache.js:
const totalSales = require('./totalSales.js');
const queues = {};
// 用于缓存结果
const cache = {};
module.exports = function totalSalesCache(item, callback) {
if (cache[item]) {
console.log('Cache hit');
// cache[item] is only a value, use .bind() to curry it as a function
// 注意要异步返回, 此处使用 process.nextTick 来实现
return process.nextTick(callback.bind(null, null, cache[item]));
}
if (queues[item]) {
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback];
totalSales(item, (err, res) => {
if (!err) {
cache[item] = res;
// clear cache every 30s
setInterval(() => {
delete cache[item];
}, 30 * 1000);
}
// for batching
const queue = queues[item];
queues[item] = null;
queue.forEach(cb => cb(null, res));
});
};
Memoization 是缓存函数调用结果的实现。在 npm 中有一些包也实现了该功能,如memoizee
我们再来回看一下 promise 的一些特性:
* 多个then()
方法可以链式附在同一个 promise 上(对比我们的批处理队列)
* then()
方法最多只会触发一次(还记得为什么使用缓存时仍要进行批处理吗?)
* 附在已经 resolve 的 promise 后面的then()
方法仍会被调用(相当于后面的 promise 都在使用缓存的结果)
* then()
方法总是异步触发(不需要手动 process.nextTick()
来进行异步返回了)
可见,promis 非常适合用于缓存和批处理。话不多说,我们马上用 promise 把上面的 server 重新实现一遍:
totalSalesPromises.js:
// pify 用于对 callback-base API 进行 promisify
const pify = require('pify');
const totalSales = pify(require('./totalSales.js'));
const cache = {};
module.exports = function totalSalesPromise(item) {
if (cache[item]) {
// 因为 cache[item] 是 promise,直接 return 便是异步
return cache[item];
}
// cache[item] is a promise here
// promise 自带批处理特性
cache[item] = totalSales(item)
.then((res) => {
setTimeout(() => { delete cache[item]}, 30 * 1000);
return res;
})
.catch(err => {
delete cache[item];
throw err;
});
return cache[item];
};
调用时也更加简单
appPromise.js:
const http = require('http');
const url = require('url');
const totalSales = require('./totalSalesPromise');
http.createServer((req, res) => {
const query = url.parse(req.url, true).query;
totalSales(query.item).then(sum => {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, () => console.log('Started'));
就是这么轻松写意。
跟之前一样,我们需要一个计算密集型的例子。这里使用经典的子集和(subset sum)问题,问在给定集合中,有无子集的和等于指定的值。该问题的复杂度为O(2^n),十分适合。
首先先给出同步的解法:
subsetSum.js
const EventEmitter = require('events').EventEmitter;
// 继承自 EventEmitter,能通过事件对中间步骤进行处理
class SubsetSum extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
}
_combine(set, subset) {
for (let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combine(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => (prev + item), 0);
if (res == this.sum) {
// 每找到一个 emit 一个 match 事件
this.emit('match', subset);
}
}
start() {
this._combine(this.set, []);
// 全部计算完毕后 emit end 事件
this.emit('end');
}
}
编写一个简单的服务器来运行它
app.js :
const http = require('http');
const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
//const SubsetSum = require('./subsetSumFork');
http.createServer((req, res) => {
const url = require('url').parse(req.url, true);
if (url.pathname === '/subsetSum') {
const data = JSON.parse(url.query.data);
res.writeHead(200);
// 在这里执行计算
const subsetSum = new SubsetSum(url.query.sum, data);
subsetSum.on('match', match => {
res.write('Match: ' + JSON.stringify(match) + '\n');
});
subsetSum.on('end', () => res.end());
subsetSum.start();
} else {
res.writeHead(200);
// 直接访问时返回 “I am alive!"
res.end("I am alive!\n");
}
}).listen(8000, () => console.log('Started'));
如果尝试直接运行,结果要在计算完成后才会输出。尝试直接访问也不会立刻返回I am alive!,要等全部计算完毕后才会响应。
原因就是因为 Node.js 时单线程的,同步计算阻塞了 event loop,导致其他的回调函数无法执行。
出现上述问题的原因在于计算是同步,连续进行的。如果每次计算后都将下一步的计算放到下一个 event loop 的 I/O 操作之后,那么其他的回调就有时间执行了。
参考 Node.js 的 event loop 示意图:
[image:3D18C926-A57F-4EB5-9759-70572FD8D931-4625-0000154A5E3FEC3C/D33F9289-4E30-447C-98D5-DDFA041B639A.png]
由上图可知,我们的 I/O 操作被阻塞了,所以需要把每一步计算放到 I/O之后。显然,可以通过setImmediate()
API 实现(放在setImmediate()
中的操作会延后到上图中的 check 阶段执行(在I/O之后)。
const EventEmitter = require('events').EventEmitter;
class SubsetSum extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
this.runningCombine;
}
_combine(set, subset) {
for (let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
// change _combine to _combineInterleaved
this._combineInterleaved(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
_combineInterleaved(set, subset) {
this.runningCombine++;
setImmediate(() => {
this._combine(set, subset);
if (--this.runningCombine === 0) {
this.emit('end');
}
});
}
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => (prev + item), 0);
if (res == this.sum) {
this.emit('match', subset);
}
}
start() {
this.runningCombine = 0;
this._combineInterleaved(this.set, []);
}
}
module.exports = SubsetSum;
这时再次运行,就会发现每隔一段时间就会返回一部分结果,而不是等待全部计算完毕后再返回了。
尽管使用setImmediate()
简单又有效,但这并不是最好的模式。事实上,将任务延后会带来一点点开销。当整个算法的所有步骤的额外开销积攒起来就很可怕了。尤其对于计算密集型任务,我们更希望能尽快地返回结果给用户。其中一个优化方式是:隔几步才用一次setImmediate()
,而不是每一步都用。
在繁忙的服务器中,一个任务即使阻塞了200ms都是不太能接受的。只有对于断断续续,非常时间运行的任务,用setImmediate()
才比较合适。
让应用保持响应的最佳方式,是不要让昂贵的计算密集型任务运行在应用的主进程上。使用子进程有以下好处:
* 同步的计算能够全速运行,而不必交织其他的步骤
* 比用setImmediate()
更简单,而且不用改原文件
* 如果需要更高的计算性能,其它进程可以通过一些低级语言来创建(如C)
* Node.js 自带的chil_process
模块,在新建子进程时还能建立通讯通道
因为新建进程需要额外的开销和时间,我们可以创建一个进程池。一方面能复用进程,减少新建进程时的开销。另一方面可以限制进程数,避免DoS攻击。
processPool.js:
const fork = require('child_process').fork;
class ProcessPool {
constructor(file, poolMax) {
this.file = file;
this.poolMax = poolMax;
// processes ready to be used
this.pool = [];
// processes using
this.active = [];
// a queue of callback, because of the lack of available process
this.waiting = [];
}
acquire(callback) {
let worker;
// 有空闲进程时,激活
if (this.pool.length > 0) {
worker = this.pool.pop();
this.active.push(worker);
return process.nextTick(callback.bind(null, null, worker));
}
// 没有空闲进程则推入等待队列
if (this.active.length >= this.poolMax) {
return this.waiting.push(callback);
}
// 根据文件建立相应的进程
worker = fork(this.file);
this.active.push(worker);
process.nextTick(callback.bind(null, null, worker));
}
release(worker) {
if (this.waiting.length > 0) {
const waitingCallback = this.waiting.shift();
waitingCallback(null, worker);
}
this.active = this.active.filter(w => worker !== worker);
this.pool.push(worker);
}
}
module.exports = ProcessPool;
如果想减少常驻内存使用和增加健壮性,可以:
1. 在一段时间闲置后,可将闲置进程终结
2. 将无响应和崩溃掉的进程杀掉重启
实现一个模块,能通知子进程工作,并从它那里拿到结果
subsetSumFork.js:
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPoll');
// 用目标计算进程模块新建进程池
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
class SubsetSumFork extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
}
start() {
workers.acquire((err, worker) => {
// 通知子进程进行计算
worker.send({sum: this.sum, set: this.set});
const onMessage = msg => {
if (msg.event === 'end') {
worker.removeListener('message', onMessage);
workers.release(worker);
}
// 向外部发送 end 事件和结果
this.emit(msg.event, msg.data);
};
// 监听从子进程收到的信息
worker.on('message', onMessage);
});
}
}
module.exports = SubsetSumFork;
来实现 subsetSumWorker.js , 用于执行密集型计算
subsetWorker.js:
const SubsetSum = require('./subsetSum');
process.on('message', msg => {
// 直接调用原本的subsetSum即可,原文件无需改动!
const subsetSum = new SubsetSum(msg.sum, msg.set);
subsetSum.on('match', data => {
process.send({event: 'match', data: data});
});
subsetSum.on('end', data => {
process.send({event: 'end', data: data});
});
subsetSum.start();
});
这时,再去对服务器进行请求,将能马上得到响应。
显然,这种模式比交织模式更强大更灵活,但仍有上限。上限在于这单台机器的硬件。