@FunC
2018-01-03T00:31:25.000000Z
字数 16684
阅读 2384
Node.js
在上一章中,我们为了让系统变得可伸缩,将其进行了分割,并运行在多个机器上。为了让它们能够正确地工作,这些被分割的部分需要通过某种方式进行通信,因此,它们需要进行整合(intergrated)。
主要有两种功能方式来进行整合:
1. 使用共享的储存空间,以此作为中介并保存全部的信息
2. 使用消息(message)来在系统的不同节点之间传播数据,时间以及命令。
消息(message):任何用于在组件和系统之间交换信息的,分离且有结构的数据。
消息传递系统(messaging system):用于促进网络中信息交换的一类解决方案、模式和架构
消息与消息传递系统的四要素:
* 通信方向:单向 or 双向
* 消息的用途
* 发信息的时机: 立即发送 or 异步发送
* 信息的发送者: 直接发送 or 通过代理
单向:
典型例子:
* 邮件
* web 服务器通过 WebSocket 向浏览器发送信息
请求/回复模式:
看起来很简单,但是当通讯是异步的,或者涉及多个节点时,事情就变复杂了:
任意取两个节点来看,它都是单向的。然而全局来看,发起者发送了请求之后,收到了一个相关联的响应(尽管来自不同的节点)。
综上,请求/回复模式与单向模式之间的真正差异,在于请求与回复的关系。在请求/回复模式中,它们都由发起者持有。
消息(message)本质上是不同软件组件之间的通信途径。使用消息的原因有很多:
* 想获取由其他系统/组件持有的信息
* 想远程执行某些操作
* 想通知其他端(peers)当前正在发生某些事情
根据其目的的不同,主要可以分成三类:命令消息、时间消息、文档消息。
在先前设计模式一章中已有涉及,命令消息就是一个序列化的命令对象,目的是用于触发某些动作的执行(因此命令中需要包含执行该任务的全部信息)。
它能用于实现RPC系统,或者用于请求数据。RESTful HTTP 调用就是一个命令的简单例子。HTTP 动词代表着相应的操作:GET-获取资源;POST-创建资源;PUT-更新资源;DELETE-删除资源。
事件消息用于通知其他组件有些事情发生了。通常包含事件类型,有时还包含细节(如上下文、作用对象以及参与者)
在 web 开发中,我们使用长轮询或者WebSockets从服务端获取通知时,使用了事件信息。
在分布式系统中,事件是一种重要的整合机制,它让我们保证系统的各个节点保持状态一致(keep on the same page)。
文档信息主要用于组件/机器间传递数据。
文档与命令的不同之处在于,文档中的信息不涉及告诉接受者如何使用文档信息的内容。
而相比于事件信息,它不要求和某个事件相关。
通常命令消息的回复就是文档信息,例如返回请求的数据。
异步通信的与发短信类似:在发信息时不要求接受者在线;响应有可能马上返回,或者延迟返回,甚至没有返回;我们可以向多个接受者发送信息,然后以任意顺序收到响应。
简而言之——使用更少的资源获得更好的并行能力。
另一个重要优点是异步通讯的消息可以被储存起来,然后再分发(马上或者延后)。这一特性在接受者十分繁忙的时候很有用,我们可以使用消息队列(message queue)来实现:
如果接受者崩溃了或者掉线了,借助消息队列,消息能被累积起来,等到接受者可用时马上投递信息,增强了系统的健壮性。
这样一个消息队列可以在信息发送者里实现,也可以放在发送者与接受者之间,甚至可以用一个外部的系统作为中间件。
消息可以端对端直接发送,也可以通过一个叫信息代理(message broker)的中介系统发送。
代理的作用是将信息的接受者和发送者解耦:
在端对端的架构中,每个节点都需要知道接受者的地址和端口,同时它们还需要协商好通信协议与消息格式。
而代理则消除了这些复杂度。每个节点都可以和任意数量的节点通信,同时不需要知道细节。代理还可以作为不同通讯协议的桥梁。
除了解耦和互操作性,代理还能提供一些进阶特性,例如持久化队列、路由、消息转换、监控以及多种传递信息模式的支持。
当然也有一些理由不使用代理:
* 移除一个单点故障(Removing a single point of failure)
* 代理也需要可伸缩,而端对端则只要求单个节点可伸缩
* 减少消息交换的延迟
如果实现一个端对端的消息系统,我们能拥有更高的灵活度和更强的功能,因为不比捆绑在任何特定的技术、协议和架构。
发布订阅模式大概是最广为人知的单向消息传递模式。订阅者订阅一类信息,发布者产生信息并分发到所有相关的订阅者。下面是端对端和基于代理这两种模式下的发布/订阅模式:
发布/订阅模式特别之处,在于消息的发布者不需要事先知道任何接收者的信息,因此它可以和未知数量的订阅者协作。发布者和订阅着之间是松耦合的,因此被认为是整合分布式系统的理想模式。
而代理的加入进一步提高了解耦程度,并提高了可靠性(例如节点间的连接出了问题也不怕)
这里用到了 ws
库,它是 Node.js 端 WebSocket 的纯净实现。
app.js
const WebSocketServer = require('ws').Server;
// 静态文件服务器
const server = require('http').createServer(
require('ecstatic')({root:`${__dirname}/www`})
);
// 新建 WebSocket 服务器,挂载在现有的 HTTP 服务器上
const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
console.log('Client connected');
ws.on('message', msg => {
console.log(`Message: ${msg}`);
// 收到信息时广播给所有客户端
broadcast(msg);
})
});
function broadcast(msg) {
wss.clients.forEach(client => {
client.send(msg);
});
}
server.listen(process.argv[2] || 8080);
www/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title></title>
<script>
// 浏览器新建 WebSocket 连接
var ws = new WebSocket('ws://' + window.document.location.host);
ws.onmessage = function(message) {
var msgDiv = document.createElement('div');
msgDiv.innerHTML = message.data;
document.getElementById('messages').appendChild(msgDiv);
};
// 发送信息到服务器
function sendMessage() {
var message = document.getElementById('msgBox').value;
ws.send(message);
}
</script>
</head>
<body>
Message:
<div id="messages"></div>
<input id="msgBox" type="text" placeholder="Send a message">
<input type="button" onclick="sendMessage()" value='Send'>
</body>
</html>
我们通过在不同的端口运行我们的聊天应用,来模拟扩容的情况:
node app 8080
node app 8081
这时在浏览器上分别访问localhost:8080
和 localhost:8081
,并发送信息。会发现两个页面并不能互相通信(因为这是还是两台独立的服务器)
Redis 是一个非常快且灵活的键值对存储容器,更多地被用作数据库,不过它也提供了一些特性,使其可以实现中心化了发布/订阅模式。
加入Reids后我们聊天应用的架构将变成这样:
可见,每个服务器实例既是发布者,也是订阅者,它们之间通过redis来相互连接。
Reids 允许对一个频道进行订阅以及发布信息,频道以字符串作为标识区分,同时允许匹配多个频道,如
chat.*
原本的服务端代码修改后如下:
const WebSocketServer = require('ws').Server;
const redis = require('redis');
// 分别建立两个redis客户端
const redisSub = redis.createClient();
const redisPub = redis.createClient();
const server = require('http').createServer(
require('ecstatic')({root:`${__dirname}/www`})
);
const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
console.log('Client connected');
ws.on('message', msg => {
console.log(`Message: ${msg}`);
// redisPub 在 chat_message 频道上发布消息
redisPub.publish('chat_messages', msg);
})
});
// redisSub 订阅 chat_message 频道
redisSub.subscribe('chat_messages');
redisSub.on('message', (channel, msg) => {
wss.clients.forEach(client => {
client.send(msg);
});
});
server.listen(process.argv[2] || 8080);
因为此时只开了一个redis服务(默认端口6379),因此此处的redis是不同的服务器实例共享的。
另外之所以需要两个redis客户端来分别作发布者和订阅者,是因为在redis中,一旦实例订阅了一个频道,那么它就只能使用和订阅相关的命令了。
此时我们运行多个应用实例:
node app 8080
node app 8081
node app 8082
它们之间的信息借助redis实现了共享
ØMQ 是一个 networking 库,它偏底层,速度快,有着极简的 API,同时也有一些搭建消息系统的基础组件,还支持各种类型的传输(如 ipc, inproc, pgm, tcp等)。
我们把代理(broker)从架构中移除了,所以每个聊天应用都要和其他实例直接连接。在ØMQ中,有两种类型的套接字是为这种用途设计的:PUB
和 SUB
。典型的模式就是将 PUB
套接字绑定到一个端口上,监听来自其他 SUB
套接字的订阅。
从上图可以看出,这种架构中,每个节点都需要意识到其他节点的存在(与基于代理的模式不同)
基本就和上图的架构一样,手动创建 PUB/SUB 套接字。
然后做一个接线员,手动把每个 PUB
套接字和其他的 SUB
套接字接上:
app.js
const WebSocketServer = require('ws').Server;
// minimist 用于读取命令行的选项参数,形如 --http 8080
const args = require('minimist')(process.argv.slice(2));
// zmq 用于操作 ØMQ
const zmq = require('zmq');
//static file server
const server = require('http').createServer(
require('ecstatic')({root: `${__dirname}/www`})
);
// 创建 PUB 套接字,端口从命令行参数选项中读取
const pubSocket = zmq.socket('pub');
pubSocket.bind(`tcp://127.0.0.1:${args['pub']}`);
// 每个实例被多个实例订阅
const subSocket = zmq.socket('sub');
const subPorts = [].concat(args['sub']);
subPorts.forEach(p => {
console.log(`Subscribing to ${p}`);
subSocket.connect(`tcp://127.0.0.1:${p}`);
});
// 只订阅 chat 开头的信息
subSocket.subscribe('chat');
subSocket.on('message', msg => {
console.log(`From other server: ${msg}`);
// 将从订阅处收到的信息广播到自己实例下的其他客户
broadcast(msg.toString().split(' ')[1]);
});
const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
console.log('Client connected');
ws.on('message', msg => {
console.log(`Message: ${msg}`);
// 广播给自己实例下的其他客户
broadcast(msg);
// 发布给其他订阅的聊天服务实例
pubSocket.send(`chat ${msg}`);
});
});
function broadcast(msg) {
wss.clients.forEach(client => {
client.send(msg);
});
}
server.listen(args['http'] || 8080);
值得注意的是,最开始订阅其他的 PUB
套接字时,它们尚未绑定到对应端口,但ØMQ却不会报错,这是因为ØMQ有着自动重连机制,每隔一段时间就会尝试重新连接。同时如果 PUB
套接字没有被订阅的话,信息也会直接丢掉。
上面的例子中我们假定架构是静态不变的,实例的数量和地址也是已知的。对于动态的,复杂的架构,我们可以引入 service registry 。
消息系统中的一个重要概念,就是消息队列(MQ, message queue)。有了消息队列,发送者和接收者在通信时不再需要同时建立起联系,在接收者不在线时,队列会帮忙保存信息,直到接受者上线。
持久化订阅者(durable subscriber):一个总是能可靠地接收到所有信息的订阅者,即使在订阅前就已经发送出去的信息也一样能接受到。
MQTT 定义了服务质量( Quality of Service)的不同等级:
* QoS0,最多一次:就是传统的 set and forget,信息是不持久的,其发送成功与否是不知道的。意味着如果断开连接或者崩溃的话,信息将会丢失。
* QoS1,至少一次:信息保证至少被接受到一次,也可能被重复接受,因为接受者可能在还没来得及通知发送者就崩溃了,为了确保送达,信息将被重发一次。
* QoS2,准确的一次:这是最可靠的QoS,它确保信息接受且仅接受一次。但同时带来了一些代价,例如速度更慢,更多的数据密集机制用于确定信息的投递状态。
正如之前提到的,一个消息队列可以在订阅者离线时把消息积攒起来。这个队列可以储存在内存中,也可以持久化到磁盘中,这样即使代理重启或崩溃都能将信息恢复:
Redis 的发布/订阅命令实现了QoS0。我们还可以结合使用其他的命令,来使 Redis 实现持久化订阅者。
长久以来,可靠的信息持久化技术被大公司所垄断。幸运的是,多亏了一些开放协议的发展(如AMQP,STOMP和MQTT),消息系统进入主流。
AMQP是一个开放的标准协议,支持许多消息队列系统。它除了定义了一个平常的通信协议以外,还提供了用于描述路由、过滤器、队列、可靠性和安全性的模型。
在AMQP中,有三个基础组件:
* Queue:将提供给客户端消费的消息存储起来的数据结构。支持一个或多个队列,当多个消费者挂载到同一个队列时,信息将通过负载均衡的方式分发。队列有下面三种类型:
* Durable:队列在代理重启时将自动重新创建。一个持久化(durable)的队列并不确保其内容也是持久化的。
* Exclusive:队列仅和一个特定的订阅者连接,当连接关闭时,队列也被销毁。
* Auto-delete:队列在最后一个订阅者也断开连接的时候删除。
* Exchange:这里是生产消息的地方。它将消息导向一个或多个队列中,根据算法不同而不同:
* Direct exchange:完全匹配路由关键字时才将消息导向队列(如chat.msg
)
* Topic exchange:匹配符合某个关键字模式的消息(如 chat.#
匹配所有以chat
开头的关键字)
* Fanout exchange:忽略路由关键字,将消息广播到所有连接的队列上
* Binding:用于连接Exchange和queue,同时定义了路由关键字,或者其他过滤来自exchange的消息的规则
这些组件由一个代理(broker)来管理,代理提供了一系列相关的API。当客户端连接到代理时,它创建了一个频道(一个连接的抽象),这个频道负责维持于代理通信的状态。
下图展示了将这些组件组合起来的样子:
一个典型的场景,就是我们为了确保一个微服务架构中的状态保持一致,不能丢失任何信息。
我们将添加一个历史记录服务,通过数据库将聊天记录持久化,这样当客户端连接的时候,我们查询并取回相应的聊天记录。如下图所示:
这里我们只用到一个fanout exchange,不需要路由,同时为每个聊天服务实例提供一个队列。这些队列是 exclusive 的,一旦连接关闭队列就会销毁,信息的可靠性通过历史记录服务来提供。因此历史记录的 queue 要求是持久化🔒的。
这个模块包括两部分:
1. 一个 HTTP 服务器,用于将聊天记录暴露给客户端
2. 一个 AMQP 消费者(AMQP consumer),负责将聊天信息抓取并保存在本地数据库中
historySvc.js
const level = require('level');
// 用于产生精度大于 ms 的时间戳,确保唯一性
const timestamp = require('monotonic-timestamp');
// 用于处理 stream 的 JSON 对象
const JSONStream = require('JSONStream');
// 可通过 AMQP 协议连接 RabbitMQ
const amqp = require('amqplib');
// 数据库
const db = level('./msgHistory');
// 客户端请求时,将聊天记录序列化后返回
require('http').createServer((req, res) => {
res.writeHead(200);
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res);
}).listen(8090);
let channel, queue;
amqp
.connect('amqp://localhost') // [1]
// 创建抽象的频道,类似于一个 session,用于维持通信状态
.then(conn => conn.createChannel())
.then(ch => {
channel = ch;
// 创建名为 chat 的 fanout exchange
return channel.assertExchange('chat', 'fanout'); // [2]
})
// 创建名为 chat_history 的 queue,默认是持久化的
.then(() => channel.assertQueue('chat_history')) // [3]
.then((q) => {
// q.queue 是 queue 的名字
queue = q.queue;
// 连接 queue 和 exchange
return channel.bindQueue(queue, 'chat'); // [4]
})
.then(() => {
// 监听来自 queue 的信息
return channel.consume(queue, msg => { // [5]
const content = msg.content.toString();
console.log(`Saving message: ${content}`);
// 存储进数据库
db.put(timestamp(), content, err => {
// 仅当存储成功后才确认(ack)这条信息
// 如果broker没收到ack,这条信息就会留在 queue 里,等待再次被处理
if (!err) channel.ack(msg);
});
});
})
.catch(err => console.log(err))
;
大部分和AMQP相关的设置于上面类似,下面只列出不同之处:
app.js
// ...
.then(() => {
// 使用 exclusive queue 即可,可靠性由历史记录服务提供
return channel.assertQueue(`chat_srv_${httpPort}`, {exclusive: true});
})
// ...
ws.on('message', msg => {
// 发布信息到 chat exchange,不带路由关键字,信息以 Buffer 的形式发送
channel.publish('chat', '', new Buffer(msg));
});
// ...
可以看到,我们通过微服务的方式,实现了即使其中一个组件缺失了(历史记录服务),系统也能继续运作。实时聊天能继续进行,只是无法查阅历史记录罢了。
在前面的章节中,我们学到了可以将耗时任务委派给多个本地的进程的完成,但它受限在单个机器的范围内。
本节中我们将在分布式架构中使用类似的模式,来使用在网络中任意位置的远程 workers 。
思路是有一个消息传递模式,允许我们将任务分发到多个机器上。其中任务既可以是单个完成的任务,也可以是将大任务分割,采取分而治之的方式完成:
显然在这种情况下,发布/订阅模式不再适用,因为我们并不想让一个任务被多个 worker 接收。我们需要一个类似于负载均衡的消息分发模式,通常被称为消费者竞争模式(competing consumers)。
与HTTP负载均衡的一个最大不同,在于消费者竞争模式中的消费者更加地活跃,它是主动连接任务生产者来获取任务的。这种模式对可伸缩架构有着很大好处,因为它允许我们随意增加 worker 的数量,同时不需要修改生产者或者适配 service registry
通常,我们只需要一个单向异步通讯,它让我们能够建立更加复杂的处理架构,同时减去同步双向通讯的负担。同时还能带来更低的延迟和更高的吞吐量:
上图中,消息被分发,经过一组workers(fanout),往前通过一系列的处理单元,最终聚集到一个节点(fanin),通常称为 sink。
PUSH和PULL套接字有一些特性,使其十分适合用于建造单向通讯系统:
* 都能在 connect 或 bind 模式下工作。也就是说我们可以将 PUSH 套接字绑定(bind)在本地端口,监听来自 PULL 套接字的连接(connect);反过来,PULL 套接字也能监听来自 PUSH 套接字的连接。消息的流动方向只有从 PUSH -> PULL,但连接的发起者的不同。其中 bind 模式适合持久化的节点,而 connect 模式适合存活时间短的节点。以上节中的task worker为例,task worker 就是短期存在的,应该使用 connect 模式主动连接 task producer 和 sink(这两者则以bind模式保持监听状态)
* 多个 PULL 连接一个 PUSH,消息将平均分发,类似于负载均衡;多个 PUSH 连接一个 PULL,消息将按顺序轮流消费
* 若果一个PUSH套接字在仍没有PULL连接时就发出消息,消息也不会丢失,而是储存在队列中,直到有PULL节点上线。
破解方法简单粗暴:直接遍历所有的字母组合,计算哈希值并与目标值比较,若一致则找到答案。
这类问题属于易并行计算问题(embarrassingly parallel),子问题之间各自独立,十分适合PUSH/PULL模式。
该应用由三部分组成:variations generator(ventilator), worker, result collector(sink)
显然,generator 和 sink 都是持久化的(durable),应该采用 bind 模式;而 worker 数量不定,视需求灵活增减,应该主动拿任务并把结果主动发送给sink,因此应该采用 connect 模式。
ventilator.js
const zmq = require('zmq');
const variationsStream = require('variations-stream');
const alphabet = 'abcdefghijklmnopqrstuvwxyz';
const batchSize = 10000;
const maxLength = process.argv[2];
const searchHash = process.argv[3];
const ventilator = zmq.socket('push');
ventilator.bindSync("tcp://*:5016");
let batch = [];
variationsStream(alphabet, maxLength)
.on('data', combination => {
batch.push(combination);
// 凑够一批打包发送
if (batch.length === batchSize) {
const msg = {searchHash: searchHash, variations: batch};
ventilator.send(JSON.stringify(msg));
batch = [];
}
})
.on('end', () => {
// 发送剩余的部分
const msg = {searchHash: searchHash, variations: batch};
ventilator.send(JSON.stringify(msg));
})
;
const zmq = require('zmq');
const crypto = require('crypto');
const fromVentilator = zmq.socket('pull');
const toSink = zmq.socket('push');
// 作为短暂存在的节点,主动连接两端
fromVentilator.connect('tcp://localhost:5016');
toSink.connect('tcp://localhost:5017');
fromVentilator.on('message', buffer => {
const msg = JSON.parse(buffer);
const variations = msg.variations;
variations.forEach( word => {
console.log(`Processing: ${word}`);
const shasum = crypto.createHash('sha1');
shasum.update(word);
const digest = shasum.digest('hex');
if (digest === msg.searchHash) {
console.log(`Found! => ${word}`);
// 解出答案后发送给 sink
toSink.send(`Found! ${digest} => ${word}`);
}
});
});
通过这种方式,可以按需启动多个 worker,天然可伸缩
const zmq = require('zmq');
const sink = zmq.socket('pull');
sink.bindSync("tcp://*:5017");
sink.on('message', buffer => {
console.log('Message from worker: ', buffer.toString());
});
在AMQP中,为了确保每条消息都只由一个消费者接收,我们需要绕过 exchange,直接将消息发送到目标队列。这种通讯模式称为点对点通讯(point-to-point)
因为我们需要并行计算,利用AMQP中队列的特性,自然而然地就能想到将多个消费者监听同一个队列(这样消息就会平均分发)。这被称为竞争性消费者模式(competing consumers pattern)
简单来说,就是在每段通讯的路上加上队列(没有 exchange 和 binding):
代码部分和先前AMQP的代码基本相同,差别主要在于不需要创建 exchange 和 bind,直接调用channel.sendToQueue()
API 即可
尽管单向通讯带来了高并行度和高效率,但它并不能解决所有的整合情况。有时候,一个合适的请求/回复模式能发挥不错的效果。
这种方式其实非常常见。从最常见的TCP,到前端的JSONP的实现,都用到了这种技术。思路是发送信息时,带上一个唯一的标识(如uuid)。相应的回复返回时,带上这个标识,发送者就能识别出这个回复是对应哪个的消息的。
这种模式的一个特点,就是回复的顺序可以可请求的顺序不同(见上图)。
在第九章中,我们已经接触过进程间通讯的API了。
对于主进程:
* child.send(message)
* child.on(‘message’, callback)
对于子进程:
* process.send(message)
* process.on(‘message’, callback)
只需要对这个现成的通讯频道加以包装,就能实现我们的需求。
const uuid = require('node-uuid');
module.exports = channel => {
const idToCallbackMap = {};
// 如果接到的回复中的id有记录,调用相应的callback
channel.on('message', message => {
const handler = idToCallbackMap[message.inReplyTo];
if(handler) {
handler(message.data);
}
});
// 返回一个抽象的请求体
return function sendRequest(req, callback) {
// 内部采用uuid,对外屏蔽
const correlationId = uuid.v4();
idToCallbackMap[correlationId] = callback;
channel.send({
type: 'request',
data: req,
id: correlationId
});
};
};
module.exports = channel =>
{ // 注册handler
return function registerHandler(handler) {
channel.on('message', message => {
if (message.type !== 'request') return;
// 要求handler将处理结果作为参数传递给第二个参数(callback)
handler(message.data, reply => {
channel.send({
type: 'response',
data: reply,
inReplyTo: message.id
});
});
});
};
};
const reply = require('./reply')(process);
reply((req, cb) => {
setTimeout(() => {
// 将结果传给cb
cb({sum: req.a + req.b});
}, req.delay);
});
const replier = require('child_process')
.fork(`${__dirname}/replier.js`);
const request = require('./request')(replier);
request({a: 1, b: 2, delay: 500}, res => {
console.log('1 + 2 = ', res.sum);
replier.disconnect();
});
request({a: 6, b: 1, delay: 100}, res => {
console.log('6 + 1 = ', res.sum);
});
如果在消息架构中有着不止一条频道或者队列,或者有着不止一个请求者,那么我们的架构就不足以应付了。在这种情况下,我们除了要有相关的id以外,还要知道返回的目标地址。
在AMQP中,返回地址就是请求者监听的回复队列的名字。因为回复队列是私有的,不在不同的消费者中共享,所以我们只需要一个短暂队列即可(transient queue)。
创建队列时,不用指明队列的名字(这样会得到一个随机名)。同时,队列是 exclusive 的,在连接断开后就销毁。
amqpRequest.js
const uuid = require('node-uuid');
const amqp = require('amqplib');
class AMQPRequest {
constructor() {
this.idToCallbackMap = {};
}
initialize() {
return amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(channel => {
this.channel = channel;
// 不指定名称,随机分配
return channel.assertQueue('', {exclusive: true});
})
.then(q => {
this.replyQueue = q.queue;
return this._listenForResponses();
})
.catch(function(err) {
console.log(err);
})
;
}
_listenForResponses() {
return this.channel.consume(this.replyQueue, msg => {
const correlationId = msg.properties.correlationId;
const handler = this.idToCallbackMap[correlationId];
if (handler) {
handler(JSON.parse(msg.content.toString()));
}
// exclusive,无需ack
}, {noAck: true});
}
// 因为需要明确地址,所以参数加上了 queue
request(queue, message, callback) {
const id = uuid.v4();
this.idToCallbackMap[id] = callback;
this.channel.sendToQueue(queue,
new Buffer(JSON.stringify(message)),
{correlationId: id, replyTo: this.replyQueue}
);
}
}
module.exports = () => new AMQPRequest();
amqpReply.js
const amqp = require('amqplib');
class AMQPReply {
constructor(qName) {
this.qName = qName;
}
initialize() {
return amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(channel => {
this.channel = channel;
return this.channel.assertQueue(this.qName);
})
.then(q => this.queue = q.queue)
.catch(err => console.log(err.stack))
;
}
handleRequest(handler) {
return this.channel.consume(this.queue, msg => {
const content = JSON.parse(msg.content.toString());
handler(content, reply => {
// 将信息回复到指定的queue
this.channel.sendToQueue(
msg.properties.replyTo,
new Buffer(JSON.stringify(reply)),
{correlationId: msg.properties.correlationId}
);
this.channel.ack(msg);
});
});
}
}
module.exports = (qName) => {
return new AMQPReply(qName);
};
和前面类似,稍加包装即可
replier.js
const Reply = require('./amqpReply');
// 指定 queue
const reply = Reply('requests_queue');
reply.initialize().then(() => {
// 与前面同样的方式发起请求,内部细节被隐藏
reply.handleRequest((req, cb) => {
console.log('Request received', req);
cb({sum: req.a + req.b});
});
});
requestor.js
const req = require('./amqpRequest')();
req.initialize().then(() => {
for (let i = 100; i > 0; i--) {
sendRandomRequest();
}
});
function sendRandomRequest() {
const a = Math.round(Math.random() * 100);
const b = Math.round(Math.random() * 100);
req.request('requests_queue', {a: a, b: b},
res => {
console.log(`${a} + ${b} = ${res.sum}`);
}
);
}
至此我们就实现了一个可靠的请求者和回复者,信息不回被丢失。同时,因为使用了AMQP的缘故,我们的回复者已经是开箱即用的可伸缩了。运行多个replier时,因为在监听同一个队列,代理将会通过负载均衡的形式分发信息。