@richey
2021-09-30T03:14:46.000000Z
字数 24113
阅读 5498
物联网创新设计实践 讲义




如图,证明python3.7已经安装完成。
- 下载并安装pycharm 社区版
https://www.jetbrains.com/pycharm/download/#section=windows
- 最简单的python程序
# coding=utf8print "hello world!\r\n"
def fun1():print("你好!\r\n")fun1()
-list列表
def fun1():print ("你好!\r\n")fun1()l=[1,2,5,6,8]print(l)print (l[3])for item in l:print (item)
-元组
ython的元组与列表类似,不同之处在于元组的元素不能修改。元组使用小括号,列表使用方括号。
tup1 = ('physics', 'chemistry', 1997, 2000);tup2 = (1, 2, 3, 4, 5 );tup3 = "a", "b", "c", "d";print (tup1)print (tup2)print (tup3)
-字典dict
bdict = {'name':'allen', 'age':'40'}for key in bdict.keys():print (key)print (bdict[key])
参考:http://blog.csdn.net/hanhuili/article/details/9389433
Twisted是用Python实现的基于事件驱动的网络引擎框架。Twisted支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。Twisted对于其支持的所有协议都带有客户端和服务器实现,同时附带有基于命令行的工具,使得配置和部署产品级的Twisted应用变得非常方便。

Reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。Reactor可以感知网络、文件系统以及定时器事件。它等待然后处理这些事件,从特定于平台的行为中抽象出来,并提供统一的接口,使得在网络协议栈的任何位置对事件做出响应都变得简单。
关于reactor有下面几个结论:
Twisted的reactor只有通过调用reactor.run()来启动。
reactor循环是在其开始的进程中运行,也就是运行在主进程中。
一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。reactor循环并不会消耗任何CPU的资源。并不需要显式的创建reactor,只需要引入就OK了。
Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
makeConnection 在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
切确的说,它取名不太好,应该叫做FactoryOfProtocals,即协议工厂(也就是工厂模式),用来管理协议对象实例的。
# coding=utf8import sysimport reif sys.platform == 'win32':from twisted.internet import win32eventreactorwin32eventreactor.install()from twisted.internet import defer, threadsfrom twisted.internet.serialport import SerialPortfrom twisted.internet import protocol,task, reactor, errorfrom twisted.protocols.basic import LineReceiverfrom twisted.python import log, usageclass GasSim(LineReceiver):def __init__(self):self._lc1 = task.LoopingCall(self.sendCmd)self._lc1.start(3,False)def sendCmd(self):self.sendLine("@01c100001".encode('utf-8'))def lineReceived(self, line):line = line.decode('utf-8')print(line)if(re.match('@\d{2}\D\w{4}\d{2}',line)):if(line[3] == 'D'):print (u'接收到1条命令!\r\n')tmpstr = "当前电压值=%.3f" % (int(line[4:8],16)/1000.0)print (tmpstr)if __name__ == '__main__':s = SerialPort(GasSim(), 'COM5', reactor, baudrate=9600)reactor.run()
本小节已经合并到下一小节
安装redis,直接解压运行即可。


我们将json格式的字符串作为value存入redis
# coding: utf-8import reimport datetimeimport timeimport jsonimport redisfrom twisted.internet import reactor, task, protocolfrom twisted.internet import defer, threadsfrom twisted.internet.protocol import Protocol, error, Factoryfrom twisted.protocols.basic import LineReceiverr = redis.Redis(host='127.0.0.1',port=6379,db=0)def getCurrentStrTime():return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())class GasServerProtocol(LineReceiver):def __init__(self, factory):self.id = ''self.factory = factorydef connectionMade(self):print(("[%s]: %s连接到服务器端。" %(getCurrentStrTime(), self.transport.getPeer())))self.sendLine("$99'".encode('utf-8'))def connectionLost(self, reason):if self.id in self.factory.clients:del self.factory.clients[self.id]print("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))print(("[%s]: %s断开了连接,原因是%s" %(getCurrentStrTime(),self.transport.getPeer(),reason)))def lineReceived(self, data):data = data.decode('utf-8')tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)print(tmpstr)if(re.match("\$\d{2}\$", data)):self.id = str(data[1:3])dicttmp = {}dicttmp['handle'] = selfdicttmp['timestamp'] = time.time()self.factory.clients[self.id] = dicttmpprint("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))if(re.match('@\d{2}\D\w{4}\d{2}',data)):if(data[3] == 'D'):print('接收到1条命令!\r\n')volttmp = (int(data[4:8],16)/1000.0)tmpstr = "当前电压值=%.3f" % volttmpprint(tmpstr)dicttmp = {}dicttmp['collecttime'] = getCurrentStrTime()dicttmp['volt'] = volttmpdicttmp['id'] = self.idr.set(self.id, json.dumps(dicttmp))print(r.get(self.id))class GasServerFactory(Factory):def __init__(self):self.clients = {} # maps clients ids to GasServerProtocol instancesself._lc = task.LoopingCall(self.send_to_clients)self._lc.start(3,now=False)def buildProtocol(self, addr):return GasServerProtocol(self)def send_to_clients(self):for client_addr in self.clients :self.clients[client_addr]['handle'].sendLine('hello!'.encode('utf-8'))def startMoniter():print("[%s]启动监控服务" % getCurrentStrTime())cf = GasServerFactory()reactor.listenTCP(8234, cf)reactor.run()def stopMoniter():print("[%s]停止监控服务" % getCurrentStrTime())try:reactor.crash()except RuntimeError:return defer.fail()else:return defer.succeed(None)startMoniter()

# coding=utf-8from flask import render_template, request, redirectfrom flask import abort,jsonify,url_for,render_template, request, redirectfrom flask_cors import CORSimport jsonimport redisfrom flask import Flaskimport datetimefrom flask_cors import CORSapp = Flask(__name__)CORS(app)r = redis.Redis(host='127.0.0.1', port=6379, db=0)@app.route('/')def hello_world():return '你好!'@app.route('/voltage')def get_voltage():id = request.args.get('id', '')if id:json_data = r.get(id)if json_data:return jsonify(json.loads(json_data))return 'no data!'return 'no id!'if __name__ == '__main__':app.run(host='0.0.0.0', port=int("5000"))


在iot数据库中,新建device、iot_realtime_data,iot_history_data,共三张表。

或执行如下脚本创建三张表

CREATE TABLE `device` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`Id`),KEY `device_id` (`device_id`)) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;CREATE TABLE `iot_history_data` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`collect_time` datetime DEFAULT NULL,`temperature` float DEFAULT NULL,`humidity` float DEFAULT NULL,`voltage` float DEFAULT NULL,PRIMARY KEY (`Id`),UNIQUE KEY `device_id` (`device_id`,`collect_time`)) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;#CREATE TABLE `iot_realtime_data` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`collect_time` datetime DEFAULT NULL,`temperature` float DEFAULT NULL,`humidity` float DEFAULT NULL,`voltage` float DEFAULT NULL,PRIMARY KEY (`Id`)) ENGINE=MyISAM DEFAULT CHARSET=utf8;CREATE TABLE `position` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(255) DEFAULT NULL,`longitude` varchar(255) DEFAULT NULL,`latitude` varchar(255) DEFAULT NULL,`collect_time` datetime DEFAULT NULL,PRIMARY KEY (`Id`)) ENGINE=MyISAM DEFAULT CHARSET=utf8;
-增加功能
在现有功能基础上,每5分钟将数据存入iot的iot_history_data表中
# coding=utf8#testNet.pyimport reimport timeimport jsonimport redisfrom datetime import datetimefrom twisted.internet import reactor, task, protocolfrom twisted.internet import defer, threadsfrom twisted.internet.protocol import Protocol, error, Factoryfrom twisted.protocols.basic import LineReceiverfrom dbhelper import insert_iot_recordr = redis.Redis(host='127.0.0.1',port=6379,db=0)def getCurrentStrTime():return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())class GasServerProtocol(LineReceiver):def __init__(self, factory):self.id = ''self.factory = factoryself.last_iot_time = datetime.now()self.fisrt_recv_flag = Truedef connectionMade(self):print("[%s]: %s连接到服务器端。" %(getCurrentStrTime(), self.transport.getPeer()))self.sendLine("$99'".encode('utf-8'))def connectionLost(self, reason):if self.id in self.factory.clients:del self.factory.clients[self.id]print ("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))print("[%s]: %s断开了连接,原因是%s" %(getCurrentStrTime(),self.transport.getPeer(),reason))def insert_iot_record_cb(self, result):print (result)self.last_iot_time = datetime.now()self.fisrt_recv_flag = Falsedef lineReceived(self, data):data = data.decode('utf-8')tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)print (tmpstr)if(re.match("\$\d{2}\$", data)):self.id = str(data[1:3])dicttmp = {}dicttmp['handle'] = selfdicttmp['timestamp'] = time.time()self.factory.clients[self.id] = dicttmpprint ("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))if(re.match('@\d{2}\D\w{12}\d{2}',data)):if(data[3] == 'D'):print (u'接收到1条命令!\r\n')volttmp = (int(data[4:8],16)/1000.0)temperature = (int(data[8:12],16)/10.0)humidity = (int(data[12:16],16)/10.0)dicttmp = {}dicttmp['collecttime'] = getCurrentStrTime()dicttmp['voltage'] = volttmpdicttmp['temperature'] = temperaturedicttmp['humidity'] = humiditydicttmp['id'] = self.idr.set(self.id, json.dumps(dicttmp))print (r.get(self.id))current_time = datetime.now()if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):insert_iot_record( r.get(self.id)).addCallback(self.insert_iot_record_cb)class GasServerFactory(Factory):def __init__(self):self.clients = {} # maps clients ids to GasServerProtocol instancesself._lc = task.LoopingCall(self.send_to_clients)self._lc.start(3,now=False)def buildProtocol(self, addr):return GasServerProtocol(self)def send_to_clients(self):for client_addr in self.clients :self.clients[client_addr]['handle'].sendLine('hello!'.encode('utf-8'))def startMoniter():print ("[%s]启动监控服务" % getCurrentStrTime())cf = GasServerFactory()reactor.listenTCP(8235, cf)reactor.run()def stopMoniter():print ("[%s]停止监控服务" % getCurrentStrTime())try:reactor.crash()except RuntimeError:return defer.fail()else:return defer.succeed(None)startMoniter()
# coding=utf-8# dbhelper.pyfrom flask_sqlalchemy import SQLAlchemyfrom twisted.enterprise import adbapiimport binasciiimport decimalimport datetimeimport MySQLdbimport jsondb_conn = 'MySQLdb'db_user = 'root'db_pass = 'root'db_host = '127.0.0.1'db_database = 'iot'dbpool = adbapi.ConnectionPool(db_conn,host=db_host,user=db_user,passwd=db_pass,db=db_database,charset='utf8',cp_reconnect=True)def _insert_iot_record(trans, record):iot_dict = json.loads(record) # 根据字符串书写格式,将字符串自动转换成 字典类型print (iot_dict)try:sql = 'insert into iot_history_data (device_id,voltage,humidity,temperature,collect_time)'+\' values(\'%s\',\'%6.2f\',\'%6.2f\',\'%6.2f\',\'%s\')' % (iot_dict['id'], iot_dict['voltage'], iot_dict['humidity'], iot_dict['temperature'], iot_dict['collecttime'])print (sql)result = trans.execute(sql)return 'True'except:return 'False'def insert_iot_record(record):return dbpool.runInteraction(_insert_iot_record, record)




用sublime打开工程文件夹,讲解文件结构






// The Vue build version to load with the `import` command// (runtime-only or standalone) has been set in webpack.base.conf with an alias.import Vue from 'vue'import FastClick from 'fastclick'import VueRouter from 'vue-router'import App from './App'import Home from './components/HelloFromVux'import { AjaxPlugin } from 'vux'Vue.use(AjaxPlugin)//this.$http.defaults.baseURL = '127.0.0.1:5000';Vue.use(VueRouter)const routes = [{path: '/',component: Home}]const router = new VueRouter({routes})FastClick.attach(document.body)Vue.config.productionTip = false/* eslint-disable no-new */new Vue({router,render: h => h(App)}).$mount('#app-box')
<template><div><x-header class="header" :left-options="{showBack: false}">物联网应用系统</x-header><divider>实时数据</divider><grid><grid-item ><ul class="grid"><li<span class="grid-iot-icon" slot="icon" ></span></li><li class = "grid-text1">气温(℃)</li><li class = "grid-text2"> {{my_data.temperature}}</li></ul></grid-item><grid-item ><ul class="grid"><li<span class="grid-iot-icon" slot="icon" ></span></li><li class = "grid-text1">湿度(%)</li><li class = "grid-text2"> {{my_data.humidity}}</li></ul></grid-item><grid-item ><ul class="grid"><li<span class="grid-iot-icon" slot="icon" ></span></li><li class = "grid-text1">电压(V)</li><li class = "grid-text2"> {{my_data.voltage}}</li></ul></grid-item></grid></div></template><script>import { Group, Cell,Divider,XHeader,Flexbox,FlexboxItem,Grid, GridItem } from 'vux'export default {components: {Group,Cell,Divider,XHeader,Flexbox,FlexboxItem,Grid,GridItem},data () {return {// note: changing this line won't causes changes// with hot-reload because the reloaded component// preserves its current state and we are modifying// its initial state.msg: 'Hello World!',my_data:[],}},mounted() {this.getData();this.timerGetData = setInterval(this.getData, 3000);},beforeDestroy() {clearInterval(this.timerGetData)},methods: {getData() {this.$http({url: 'http://127.0.0.1:5000/voltage?id=12',method: 'GET',}).then((res) => {this.my_data = res.data;console.log('success ');}, (res) => {console.log('error ');});},}}</script><style>.vux-demo {text-align: center;}.logo {width: 100px;height: 100px}.grid-iot-icon {font-family: 'iot';font-size: 32px;color: @theme-color;text-align: center;}.grid-text1 {text-align: center;color: grey;list-style-type:none;font-size: 18px;}.grid-text2 {text-align: center;color: black;list-style-type:none;font-size: 22px;}.grid {font-size: 22px;text-align: center;background-color: white;list-style-type:none;}.header {background-color: @theme-color;}</style>
<template><div><tabbar style="position:fixed" slot="bottom"><tabbar-item link="/"><span class="iot-icon-title" slot="icon"></span><span class="iot-text-title" slot="label">首页</span></tabbar-item><tabbar-item link="/map"><span class="iot-icon-title" slot="icon"></span><span class="iot-text-title" slot="label">监控</span></tabbar-item><tabbar-item link="home"><span class="iot-icon-title1" slot="icon"></span><span class="iot-text-title" slot="label">查询</span></tabbar-item><tabbar-item ><span class="iot-icon-title" slot="icon"></span><span class="iot-text-title" slot="label">设置</span></tabbar-item></tabbar></div></template><script>import { Tabbar, TabbarItem } from 'vux'export default {ready () {document.querySelector('body').style.height = '100%'document.querySelector('html').style.height = '100%'document.querySelector('#app').style.height = '100%'},components: {Tabbar,TabbarItem}}</script><style lang="less">.iot-tabbar {position: fixed;}.iot-icon-title {font-family: 'iot';font-size: 32px;line-height: 30px;}.iot-icon-title1 {font-family: 'iot';font-size: 28px;line-height: 32px;}.iot-text-title {font-size: 14px;line-height: 28px;}</style>
<template><div><div style="height: 10%; overflow: auto;"><h3>Simple map</h3>Marker is placed at {{ marker.lat }}, {{ marker.lng }}</div><v-map style="height: 90%" :zoom="zoom" :center="center"><v-tilelayer :url="url" :attribution="attribution"></v-tilelayer><v-marker :lat-lng="marker"></v-marker></v-map></div></template><script>import Vue2Leaflet from 'vue2-leaflet';export default {components: {'v-map': Vue2Leaflet.Map,'v-tilelayer' :Vue2Leaflet.TileLayer,'v-marker': Vue2Leaflet.Marker},data () {return {zoom:13,center: L.latLng(47.413220, -1.219482),url:'http://{s}.tile.osm.org/{z}/{x}/{y}.png',attribution:'© <a href="http://osm.org/copyright">OpenStreetMap</a> contributors',marker: L.latLng(47.413220, -1.219482),}}}</script>
// The Vue build version to load with the `import` command// (runtime-only or standalone) has been set in webpack.base.conf with an alias.import Vue from 'vue'import FastClick from 'fastclick'import VueRouter from 'vue-router'import App from './App'import Home from './components/HelloFromVux'import Hello from './components/Hello'import Map from './components/BMap'import BaiduMap from 'vue-baidu-map'Vue.use(BaiduMap, {// ak 是在百度地图开发者平台申请的密钥 详见 http://lbsyun.baidu.com/apiconsole/key */ak: 'YOUR_APP_KEY'})import { AjaxPlugin } from 'vux'Vue.use(AjaxPlugin)//this.$http.defaults.baseURL = '127.0.0.1:5000';Vue.use(VueRouter)const routes = [{path: '/',name: 'default',component: Home},{path: '/map',component: Map},{path: '*',redirect: { name: 'default' },}]const router = new VueRouter({hashbang: false,linkActiveClass: 'active',mode: 'history',base: __dirname,routes})FastClick.attach(document.body)/* eslint-disable no-new */new Vue({router,render: h => h(App)}).$mount('#app-box')
getData() {
this.$http({
url: 'http://192.168.0.120:5000/voltage',
method: 'GET',
params: {
id: "12"
},
#testFlask.py# coding=utf-8from flask import render_template, request, redirectfrom flask import abort,jsonify,url_for,render_template, request, redirectfrom flask_cors import CORS, cross_originfrom flask_restless import APIManagerimport jsonimport redisfrom flask import Flaskimport datetimefrom flask_cors import CORSfrom flask_sqlalchemy import SQLAlchemyfrom database import dbfrom application import appfrom models import IotHistoryData,Positionr = redis.Redis(host='127.0.0.1', port=6379, db=0)apimanager = APIManager(app, flask_sqlalchemy_db=db)apimanager.create_api(IotHistoryData,methods=['GET'],# url_prefix='/api/v1',# preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),results_per_page=100,url_prefix='',collection_name='history',)apimanager.create_api(Position,methods=['GET','PUT','PATCH','POST'],# url_prefix='/api/v1',# preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),results_per_page=100,url_prefix='',include_columns = ['Id', 'latitude', 'longitude','collect_time'],collection_name='position',)@app.route('/')def hello_world():return '你好!'@app.route('/voltage')def get_voltage():id = request.args.get('id', '')if id:json_data = r.get(id)if json_data:return jsonify(json.loads(json_data))return 'no data!'return 'no id!'@app.route('/relay', methods=['POST'])def set_relay():json_data = request.get_data()py_data = json.loads(json_data)r.lpush('front_tcp', (json.dumps(py_data)).encode('utf-8'))return jsonify(py_data)if __name__ == '__main__':app.run(host='127.0.0.1', port=int("5000"))
#testNetRedis.py# coding=utf8import reimport timeimport jsonimport redisfrom datetime import datetimefrom twisted.internet import reactor, task, protocolfrom twisted.internet import defer, threadsfrom twisted.internet.protocol import Protocol, error, Factoryfrom twisted.protocols.basic import LineReceiverfrom dbhelper import insert_iot_recordr = redis.Redis(host='127.0.0.1',port=6379,db=0)def getCurrentStrTime():return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())class GasServerProtocol(LineReceiver):def __init__(self, factory):self.id = ''self.factory = factoryself.last_iot_time = datetime.now()self.fisrt_recv_flag = Truedef connectionMade(self):print("[%s]: %s连接到服务器端。" %(getCurrentStrTime(), self.transport.getPeer()))self.sendLine("$99'".encode('utf-8'))def connectionLost(self, reason):if self.id in self.factory.clients:del self.factory.clients[self.id]print("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))print("[%s]: %s断开了连接,原因是%s" %(getCurrentStrTime(),self.transport.getPeer(),reason))def insert_iot_record_cb(self, result):print(result)self.last_iot_time = datetime.now()self.fisrt_recv_flag = Falsedef lineReceived(self, data):data = data.decode('utf-8')tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)print(tmpstr)if(re.match("\$\d{2}\$", data)):self.id = str(data[1:3])dicttmp = {}dicttmp['handle'] = selfdicttmp['timestamp'] = time.time()self.factory.clients[self.id] = dicttmpprint("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))if(re.match('@\d{2}\D\w{12}\d{2}',data)):if(data[3] == 'D'):print(u'接收到1条命令!\r\n')volttmp = (int(data[4:8],16)/1000.0)temperature = (int(data[8:12],16)/10.0)humidity = (int(data[12:16],16)/10.0)dicttmp = {}dicttmp['collecttime'] = getCurrentStrTime()dicttmp['voltage'] = volttmpdicttmp['temperature'] = temperaturedicttmp['humidity'] = humiditydicttmp['id'] = self.idr.set(self.id, json.dumps(dicttmp))print(r.get(self.id))current_time = datetime.now()if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):insert_iot_record( r.get(self.id)).addCallback(self.insert_iot_record_cb)class GasServerFactory(Factory):def __init__(self):self.clients = {} # maps clients ids to GasServerProtocol instancesself._lc = task.LoopingCall(self.send_to_clients)self._lc.start(60,now=False)def buildProtocol(self, addr):return GasServerProtocol(self)def send_to_clients(self):for client_addr in self.clients :self.clients[client_addr]['handle'].sendLine('@12d000000'.encode('utf-8'))@defer.inlineCallbacksdef receive_from_mq(self):try:data = yield r.rpop('front_tcp')if data and data != '':print(u'接收到1条命令!%s\r\n' % (data))self.process_data_from_mq(data)except:passdef process_data_from_mq(self, data):loads_data = (json.loads(data))command = loads_data.get('command')addr = (loads_data.get('addr'))mydata = loads_data.get('data')send_data = '@%s%s%s01' % (addr,command,mydata)send_data = send_data.encode('utf-8')self.clients[addr]['handle'].sendLine(send_data)def startMoniter():print("[%s]启动监控服务" % getCurrentStrTime())cf = GasServerFactory()task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)task_receive_data_from_mq.start(0.1, now=False)reactor.listenTCP(8234, cf)reactor.run()def stopMoniter():print("[%s]停止监控服务" % getCurrentStrTime())try:reactor.crash()except RuntimeError:return defer.fail()else:return defer.succeed(None)startMoniter()





