@zhangnian88123
2016-06-14T10:54:56.000000Z
字数 4331
阅读 1894
顾名思义,ZGateway是一个网关进程,位于Client App和业务服务进程组之间,主要的职责有:
1. 隐藏后端业务服务进程组,整个系统对外只暴露一个IP和Port
2. 协议解析与转换
3. 消息收发
4. 过载保护
5. 流量控制
6. 安全校验
7. 运行时状态收集与检测
单进程多线程(One EventLoop Per Thread + WorkThread Pool模型)
根据线程的作用,分为三类线程:
I/O线程只负责数据收发,不能做复杂的CPU密集型任务,否则会阻塞整个事件循环,导致其他conn fd上的读写事件得不到及时处理。
客户端与ZGW之间保持TCP长连接(N:1),ZGW与后端服务之间使用zeromq的PUSH-PULL模式的连接。
Master线程
- 监听TCP端口,关注listen fd上的可读事件
- 可读事件发生时accept客户端的连接请求,将conn fd分派给一个I/O线程
- 给每个conn fd分配一个唯一的id,内部维护一个由id到fd的映射表:std::map
- 连接数控制,比如用id数去控制( 1 <= id <= 10000),id池在系统启动时创建,每创建一个连接就分配一个id,id分配完后就不再接收连接了
- 解析配置文件
- 获取系统运行时状态(比如消息队列中堆积的消息数、发送接收的消息数)
I/O线程
- 监听conn fd上的可读和可写事件
- 可读事件发生时,读取数据,根据协议处理黏包分包,得到一个完整的消息
- 将消息放入进程内的消息队列中,并通知PUSH线程收取消息(通知方式使用event fd)
Worker线程(PUSH线程:通过push socket向后端进程发消息)
- 创建多个push socket, bind多个port,不同类型的后端服务组对应一个port,比如,port:8081用于后端场景进程组的连接,port:8082用于后端战斗进程组的连接
- 等待消息队列的可读事件(通过event fd或信号量的机制)
- 可读事件发生后,从消息队列中取出一条消息,根据消息类型,使用对应的push socket将消息发送给后端进程组(ZeroMQ PUSH模式自带负载均衡)
当后端进程组很多时,PUSH线程需要管理很多的push socket,为了在O(1)时间内找到消息类型对应的push socket,可使用数组保存push sockets,用arr_socks[type]查找
Worker线程(PULL线程:通过PULL从后端服务接收回复消息)
- 创建一个pull socket,bind到一个port,从zeromq的pull socket收取消息
- 解析消息,根据消息中的id,找到对应的TcpConnection对象
- 利用TcpConnection对象发送消息给对应的客户端
由于ZGW是整个网络拓扑结构中较稳定的部分,所以不管是用于发送消息的PUSH,还是用于接收消息的PULL,都应该调用bind!!
#用python实现的demo,演示了Push线程和Pull线程的职责
#encoding=utf-8
import sys
import time
import logging
import signal
import threading
import random
import zmq
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
incoming = '127.0.0.1:8888'
outcoming = [('chat', '127.0.0.1:1111'), ('scene', '127.0.0.1:1112'), ('fight', '127.0.0.1:1113')]
class PushThr(threading.Thread):
def __init__(self, zmq_ctx):
threading.Thread.__init__(self)
self.msg_id = 0
self.dict_socks = {}
for svr_group in outcoming:
svr_type = svr_group[0]
svr_endpoint = svr_group[1]
sock_push = zmq_ctx.socket(zmq.PUSH)
sock_push.bind('tcp://%s' % svr_endpoint)
self.dict_socks[svr_type] = sock_push
def get_msg_from_queue(self):
self.msg_id = self.msg_id + 1
msg_types = ['chat', 'scene', 'fight']
idx = random.randint(0, 2)
msg_type = msg_types[idx]
msg_body = 'zhangnian'
return '%s|%d|%s' % (msg_type, self.msg_id, msg_body)
def dispatch(self, msg):
items = msg.split('|')
assert 3 == len(items)
msg_type = items[0]
msg_id = items[1]
msg_body = items[2]
if msg_type not in self.dict_socks:
logging.error(u'不支持的消息类型')
return
self.dict_socks[msg_type].send('%s|%s' % (msg_id, msg_body))
return
def run(self):
print u'Push线程开始执行, 线程id: %d' % threading.current_thread().ident
while True:
msg = self.get_msg_from_queue()
self.dispatch(msg)
class PullThr(threading.Thread):
def __init__(self, zmq_ctx):
threading.Thread.__init__(self)
self.socket_pull = zmq_ctx.socket(zmq.PULL)
self.socket_pull.bind('tcp://%s' % incoming)
self.poller = zmq.Poller()
self.poller.register(self.socket_pull, zmq.POLLIN)
def run(self):
print u'Pull线程开始执行, 线程id: %d' % threading.current_thread().ident
while True:
active_socks = dict(self.poller.poll())
if self.socket_pull in active_socks and active_socks[self.socket_pull] == zmq.POLLIN:
msg = self.socket_pull.recv()
print u'Pull收到消息: %s' % msg
def main():
zmq_ctx = zmq.Context()
push_thr = PushThr(zmq_ctx)
pull_thr = PullThr(zmq_ctx)
push_thr.start()
pull_thr.start()
push_thr.join()
pull_thr.join()
if __name__ == '__main__':
sys.exit(main())
客户端 —> ZGW
请求消息:[len][type][msg]
len:4字节,表示消息体的长度
type:1字节,表示消息的类型
len和type组成消息5byte的msg header
msg:变长消息体,一般是用protobuf序列化后的字符串回复消息:[len][type][msg]
ZGW —> 后端服务
请求消息: [id][msg]
id:4字节,表示客户端连接的唯一标识,ZGW需要根据这个id去映射fd(std::map)。
msg:变长消息体回复消息:[id][msg]
id:4字节,表示客户端连接的唯一标识,也就是请求消息中的id
msg:变长消息体
注意,ZGW和后端服务之间的通讯协议没有len字段,是因为zeromq本身是以消息的方式传递的,不需要处理黏包分包
void onClientMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp receiveTime)
{
while( buf->readableBytes() > kMsgHeaderLen )
{
uint32_t msg_len = buf->peekInt32();
uint8_t msg_type = buf->peekInt8();
if( msg_len > kMaxMsgSize )
{
LOG_ERROR << "消息体长度非法, msg_len: " << msg_len;
conn->shutdown();
break;
}
if( buf->readableBytes() >= kMsgHeaderLen + msg_len )
{
msg_len = buf->readInt32();
msg_type = buf->readInt8();
std::string msg_body(buf->peek(), msg_len);
ZMSG msg(boost::any_cast<int>(conn->getContext()), msg_type, msg_body);
assert( msg.isVaild() );
messageCallback_(conn, msg, receiveTime);
buf->retrieve(msg_len);
LOG_INFO << "已处理一个完整的消息,缓冲区中剩余字节数为: " << buf->readableBytes();
}
else
{
LOG_INFO << "缓冲区中的字节数不足一个完整的消息";
break;
}
}