@bergus
2017-04-23T14:03:19.000000Z
字数 4303
阅读 1713
python rpc nsq
# -*- coding: utf-8 -*-import loggingimport sslimport msgpackimport nsqfrom tornado.ioloop import IOLooplog = logging.getLogger(__name__)import redisclass RPCServer(object):def __init__(self):self.__code_obj = {}nsq.Reader(topic='mq_input',channel='mq',name="mq_input.mq",nsqd_tcp_addresses=['127.0.0.1:4150'],lookupd_http_addresses=['http://127.0.0.1:4161'],message_handler=self._handle_monitor,heartbeat_interval=10,tls_options={'cert_reqs': ssl.CERT_NONE},output_buffer_size=4096,output_buffer_timeout=100,max_tries=5,max_in_flight=9,lookupd_poll_interval=60,low_rdy_idle_timeout=10,max_backoff_duration=128,lookupd_poll_jitter=0.3,lookupd_connect_timeout=1,lookupd_request_timeout=2,)self.writer = nsq.Writer(['127.0.0.1:4150'])self.redis_client = redis.Redis(host='127.0.0.1', port=6379)def __package(self, name):c = self.__code_obj.get(name)if c:return credis_client = self.redis_clientif '#' in name:_p, _f = name.split("#")_p = "." + _pelse:_p = ""_f = name__s = redis_client.hget("__code__" + _p, _f)self.__code_obj[name] = self.__e(_f.split(".")[-1], __s)return self.__code_obj[name]def import_(self, name):return self.__package(name)def __e(self, name, source):try:exec source__c = eval(name)setattr(__c, "import_", self.import_)return __cexcept Exception, e:log.error(e.message)return e.messagedef f_m(self, conn, data):print conn, datadef _handle_monitor(self, msg):_pp = msgpack.unpackb(msg.body, use_list=False)print _pp_name, _id, _package, _p_params, _method, _m_params = _pp_o = self.__code_obj.get((_package, str(_p_params)))if not _o:res = self.import_(_package)print res_o = res(_p_params)self.__code_obj[(_package, str(_p_params))] = _o__o = getattr(_o, _method)_r = __o(_m_params)print _rself.writer.pub(_name, msgpack.packb((_id, _r)), callback=self.f_m)return Trueif __name__ == '__main__':RPCServer()loop = IOLoop.instance()loop.start()
# -*- coding: utf-8 -*-import inspectimport loggingimport sysimport uuidfrom os import listdirfrom os.path import splitextimport msgpackimport nsqimport redisfrom tornado import genfrom tornado import ioloopfrom tornado.concurrent import Futurefrom tornado.ioloop import IOLooplog = logging.getLogger(__name__)class _Package(object):def __init__(self, req, _name, class_path, class_args):self.__req = reqself.name = _nameself.class_path = class_pathself.class_args = class_argsself.writer = nsq.Writer(['127.0.0.1:4150'])def f_m(self, conn, data):# print conn, datapassdef call(self, _method, _m_args):_id = str(uuid.uuid4())self.__req[_id] = Future()self.writer.pub('mq_input', msgpack.packb((self.name,_id,self.class_path,self.class_args,_method,_m_args)), callback=self.f_m)return self.__req[_id]class RPCClient(object):__req = {}name = 'mq_output'code_path = 'test'def __init__(self):nsq.Reader(topic=self.name,channel='mq',name="mq_output.mq",# nsqd_tcp_addresses=['127.0.0.1:4150'],lookupd_http_addresses=['http://127.0.0.1:4161'],message_handler=self._handle_monitor,heartbeat_interval=10,output_buffer_size=4096,output_buffer_timeout=100,max_tries=5,max_in_flight=9,lookupd_poll_interval=60,low_rdy_idle_timeout=10,max_backoff_duration=128,lookupd_poll_jitter=0.3,lookupd_connect_timeout=1,lookupd_request_timeout=2,)import testself.code_path = test.__path__[0]print self.code_pathself.redis_client = redis.Redis(host='127.0.0.1', port=6379)_p = self.code_path.split("/").pop()print(_p)for name, code in self.upload_code():self.redis_client.hset("__code__." + _p, name, code)def _handle_monitor(self, msg):_id, _res = msgpack.unpackb(msg.body, use_list=False)print _id, _restry:self.__req[_id].set_result(_res)except:passreturn Truedef upload_code(self):code_path = self.code_pathsys.path.append(code_path)for f in listdir(code_path):name, f_ext = splitext(f)if name.startswith('__') or name.endswith("__") or f_ext != '.py':continue__obj = __import__(name)for k, v in inspect.getmembers(__obj):if v.__class__.__name__ == 'type':yield "{}.{}".format(name, k), "# -*- coding: utf-8 -*-\n\n\n{}".format(inspect.getsource(v))def package(self, class_path, args=None):return _Package(self.__req, self.name, class_path, args)@gen.coroutinedef _call():_a = []for i in xrange(10):a = yield p.call('__call__', "jj")_a.append(a)raise gen.Return(_a)@gen.coroutinedef _call1():_a = yield _call()print _afor i in xrange(10):b = yield s.call("get", 'http://www.baidu.com')print bif __name__ == '__main__':c = RPCClient()p = c.package("test#hello.Hello", ("ssss", dict(www=2)))s = c.package("test#spider.Spider")ioloop.PeriodicCallback(_call1, 1000).start()loop = IOLoop.instance()loop.start()
