[关闭]
@zhangnian88123 2016-06-14T10:54:56.000000Z 字数 4331 阅读 1894

ZGateWay设计方案

功能概述

顾名思义,ZGateway是一个网关进程,位于Client App和业务服务进程组之间,主要的职责有:
1. 隐藏后端业务服务进程组,整个系统对外只暴露一个IP和Port
2. 协议解析与转换
3. 消息收发
4. 过载保护
5. 流量控制
6. 安全校验
7. 运行时状态收集与检测

设计要点


详细设计

由于ZGW是整个网络拓扑结构中较稳定的部分,所以不管是用于发送消息的PUSH,还是用于接收消息的PULL,都应该调用bind!!

  1. #用python实现的demo,演示了Push线程和Pull线程的职责
  2. #encoding=utf-8
  3. import sys
  4. import time
  5. import logging
  6. import signal
  7. import threading
  8. import random
  9. import zmq
  10. logging.basicConfig(level=logging.DEBUG,
  11. format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
  12. datefmt='%a, %d %b %Y %H:%M:%S')
  13. incoming = '127.0.0.1:8888'
  14. outcoming = [('chat', '127.0.0.1:1111'), ('scene', '127.0.0.1:1112'), ('fight', '127.0.0.1:1113')]
  15. class PushThr(threading.Thread):
  16. def __init__(self, zmq_ctx):
  17. threading.Thread.__init__(self)
  18. self.msg_id = 0
  19. self.dict_socks = {}
  20. for svr_group in outcoming:
  21. svr_type = svr_group[0]
  22. svr_endpoint = svr_group[1]
  23. sock_push = zmq_ctx.socket(zmq.PUSH)
  24. sock_push.bind('tcp://%s' % svr_endpoint)
  25. self.dict_socks[svr_type] = sock_push
  26. def get_msg_from_queue(self):
  27. self.msg_id = self.msg_id + 1
  28. msg_types = ['chat', 'scene', 'fight']
  29. idx = random.randint(0, 2)
  30. msg_type = msg_types[idx]
  31. msg_body = 'zhangnian'
  32. return '%s|%d|%s' % (msg_type, self.msg_id, msg_body)
  33. def dispatch(self, msg):
  34. items = msg.split('|')
  35. assert 3 == len(items)
  36. msg_type = items[0]
  37. msg_id = items[1]
  38. msg_body = items[2]
  39. if msg_type not in self.dict_socks:
  40. logging.error(u'不支持的消息类型')
  41. return
  42. self.dict_socks[msg_type].send('%s|%s' % (msg_id, msg_body))
  43. return
  44. def run(self):
  45. print u'Push线程开始执行, 线程id: %d' % threading.current_thread().ident
  46. while True:
  47. msg = self.get_msg_from_queue()
  48. self.dispatch(msg)
  49. class PullThr(threading.Thread):
  50. def __init__(self, zmq_ctx):
  51. threading.Thread.__init__(self)
  52. self.socket_pull = zmq_ctx.socket(zmq.PULL)
  53. self.socket_pull.bind('tcp://%s' % incoming)
  54. self.poller = zmq.Poller()
  55. self.poller.register(self.socket_pull, zmq.POLLIN)
  56. def run(self):
  57. print u'Pull线程开始执行, 线程id: %d' % threading.current_thread().ident
  58. while True:
  59. active_socks = dict(self.poller.poll())
  60. if self.socket_pull in active_socks and active_socks[self.socket_pull] == zmq.POLLIN:
  61. msg = self.socket_pull.recv()
  62. print u'Pull收到消息: %s' % msg
  63. def main():
  64. zmq_ctx = zmq.Context()
  65. push_thr = PushThr(zmq_ctx)
  66. pull_thr = PullThr(zmq_ctx)
  67. push_thr.start()
  68. pull_thr.start()
  69. push_thr.join()
  70. pull_thr.join()
  71. if __name__ == '__main__':
  72. sys.exit(main())

协议设计

客户端 —> ZGW

请求消息:[len][type][msg]
len:4字节,表示消息体的长度
type:1字节,表示消息的类型
len和type组成消息5byte的msg header
msg:变长消息体,一般是用protobuf序列化后的字符串

回复消息:[len][type][msg]

ZGW —> 后端服务

请求消息: [id][msg]
id:4字节,表示客户端连接的唯一标识,ZGW需要根据这个id去映射fd(std::map)。
msg:变长消息体

回复消息:[id][msg]
id:4字节,表示客户端连接的唯一标识,也就是请求消息中的id
msg:变长消息体

注意,ZGW和后端服务之间的通讯协议没有len字段,是因为zeromq本身是以消息的方式传递的,不需要处理黏包分包

TLV解码器实现(采用muduo库)

  1. void onClientMessage(const muduo::net::TcpConnectionPtr& conn,
  2. muduo::net::Buffer* buf,
  3. muduo::Timestamp receiveTime)
  4. {
  5. while( buf->readableBytes() > kMsgHeaderLen )
  6. {
  7. uint32_t msg_len = buf->peekInt32();
  8. uint8_t msg_type = buf->peekInt8();
  9. if( msg_len > kMaxMsgSize )
  10. {
  11. LOG_ERROR << "消息体长度非法, msg_len: " << msg_len;
  12. conn->shutdown();
  13. break;
  14. }
  15. if( buf->readableBytes() >= kMsgHeaderLen + msg_len )
  16. {
  17. msg_len = buf->readInt32();
  18. msg_type = buf->readInt8();
  19. std::string msg_body(buf->peek(), msg_len);
  20. ZMSG msg(boost::any_cast<int>(conn->getContext()), msg_type, msg_body);
  21. assert( msg.isVaild() );
  22. messageCallback_(conn, msg, receiveTime);
  23. buf->retrieve(msg_len);
  24. LOG_INFO << "已处理一个完整的消息,缓冲区中剩余字节数为: " << buf->readableBytes();
  25. }
  26. else
  27. {
  28. LOG_INFO << "缓冲区中的字节数不足一个完整的消息";
  29. break;
  30. }
  31. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注