@bergus
2017-04-23T22:03:19.000000Z
字数 4303
阅读 1563
python
rpc
nsq
# -*- coding: utf-8 -*-
import logging
import ssl
import msgpack
import nsq
from tornado.ioloop import IOLoop
log = logging.getLogger(__name__)
import redis
class RPCServer(object):
def __init__(self):
self.__code_obj = {}
nsq.Reader(
topic='mq_input',
channel='mq',
name="mq_input.mq",
nsqd_tcp_addresses=['127.0.0.1:4150'],
lookupd_http_addresses=['http://127.0.0.1:4161'],
message_handler=self._handle_monitor,
heartbeat_interval=10,
tls_options={'cert_reqs': ssl.CERT_NONE},
output_buffer_size=4096,
output_buffer_timeout=100,
max_tries=5,
max_in_flight=9,
lookupd_poll_interval=60,
low_rdy_idle_timeout=10,
max_backoff_duration=128,
lookupd_poll_jitter=0.3,
lookupd_connect_timeout=1,
lookupd_request_timeout=2,
)
self.writer = nsq.Writer(['127.0.0.1:4150'])
self.redis_client = redis.Redis(host='127.0.0.1', port=6379)
def __package(self, name):
c = self.__code_obj.get(name)
if c:
return c
redis_client = self.redis_client
if '#' in name:
_p, _f = name.split("#")
_p = "." + _p
else:
_p = ""
_f = name
__s = redis_client.hget("__code__" + _p, _f)
self.__code_obj[name] = self.__e(_f.split(".")[-1], __s)
return self.__code_obj[name]
def import_(self, name):
return self.__package(name)
def __e(self, name, source):
try:
exec source
__c = eval(name)
setattr(__c, "import_", self.import_)
return __c
except Exception, e:
log.error(e.message)
return e.message
def f_m(self, conn, data):
print conn, data
def _handle_monitor(self, msg):
_pp = msgpack.unpackb(msg.body, use_list=False)
print _pp
_name, _id, _package, _p_params, _method, _m_params = _pp
_o = self.__code_obj.get((_package, str(_p_params)))
if not _o:
res = self.import_(_package)
print res
_o = res(_p_params)
self.__code_obj[(_package, str(_p_params))] = _o
__o = getattr(_o, _method)
_r = __o(_m_params)
print _r
self.writer.pub(_name, msgpack.packb((_id, _r)), callback=self.f_m)
return True
if __name__ == '__main__':
RPCServer()
loop = IOLoop.instance()
loop.start()
# -*- coding: utf-8 -*-
import inspect
import logging
import sys
import uuid
from os import listdir
from os.path import splitext
import msgpack
import nsq
import redis
from tornado import gen
from tornado import ioloop
from tornado.concurrent import Future
from tornado.ioloop import IOLoop
log = logging.getLogger(__name__)
class _Package(object):
def __init__(self, req, _name, class_path, class_args):
self.__req = req
self.name = _name
self.class_path = class_path
self.class_args = class_args
self.writer = nsq.Writer(['127.0.0.1:4150'])
def f_m(self, conn, data):
# print conn, data
pass
def call(self, _method, _m_args):
_id = str(uuid.uuid4())
self.__req[_id] = Future()
self.writer.pub('mq_input', msgpack.packb((
self.name,
_id,
self.class_path,
self.class_args,
_method,
_m_args
)), callback=self.f_m
)
return self.__req[_id]
class RPCClient(object):
__req = {}
name = 'mq_output'
code_path = 'test'
def __init__(self):
nsq.Reader(
topic=self.name,
channel='mq',
name="mq_output.mq",
# nsqd_tcp_addresses=['127.0.0.1:4150'],
lookupd_http_addresses=['http://127.0.0.1:4161'],
message_handler=self._handle_monitor,
heartbeat_interval=10,
output_buffer_size=4096,
output_buffer_timeout=100,
max_tries=5,
max_in_flight=9,
lookupd_poll_interval=60,
low_rdy_idle_timeout=10,
max_backoff_duration=128,
lookupd_poll_jitter=0.3,
lookupd_connect_timeout=1,
lookupd_request_timeout=2,
)
import test
self.code_path = test.__path__[0]
print self.code_path
self.redis_client = redis.Redis(host='127.0.0.1', port=6379)
_p = self.code_path.split("/").pop()
print(_p)
for name, code in self.upload_code():
self.redis_client.hset("__code__." + _p, name, code)
def _handle_monitor(self, msg):
_id, _res = msgpack.unpackb(msg.body, use_list=False)
print _id, _res
try:
self.__req[_id].set_result(_res)
except:
pass
return True
def upload_code(self):
code_path = self.code_path
sys.path.append(code_path)
for f in listdir(code_path):
name, f_ext = splitext(f)
if name.startswith('__') or name.endswith("__") or f_ext != '.py':
continue
__obj = __import__(name)
for k, v in inspect.getmembers(__obj):
if v.__class__.__name__ == 'type':
yield "{}.{}".format(name, k), "# -*- coding: utf-8 -*-\n\n\n{}".format(inspect.getsource(v))
def package(self, class_path, args=None):
return _Package(self.__req, self.name, class_path, args)
@gen.coroutine
def _call():
_a = []
for i in xrange(10):
a = yield p.call('__call__', "jj")
_a.append(a)
raise gen.Return(_a)
@gen.coroutine
def _call1():
_a = yield _call()
print _a
for i in xrange(10):
b = yield s.call("get", 'http://www.baidu.com')
print b
if __name__ == '__main__':
c = RPCClient()
p = c.package("test#hello.Hello", ("ssss", dict(www=2)))
s = c.package("test#spider.Spider")
ioloop.PeriodicCallback(_call1, 1000).start()
loop = IOLoop.instance()
loop.start()