@richey
2020-08-24T06:24:36.000000Z
字数 8082
阅读 2061
python redis twisted我们将json格式的字符串作为value存入redis
# coding=utf8import reimport datetimeimport timeimport jsonimport redisfrom twisted.internet import reactor, task, protocolfrom twisted.internet import defer, threadsfrom twisted.internet.protocol import Protocol, error, Factoryfrom twisted.protocols.basic import LineReceiverr = redis.Redis(host='127.0.0.1',port=6379,db=0)def getCurrentStrTime():return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())class GasServerProtocol(LineReceiver):def __init__(self, factory):self.id = ''self.factory = factorydef connectionMade(self):print("[%s]: %s连接到服务器端。" %(getCurrentStrTime(), self.transport.getPeer()))self.sendLine("$99'")def connectionLost(self, reason):if self.id in self.factory.clients:del self.factory.clients[self.id]print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)print("[%s]: %s断开了连接,原因是%s" %(getCurrentStrTime(),self.transport.getPeer(),reason))def lineReceived(self, data):tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)print tmpstrif(re.match("\$\d{2}\$", data)):self.id = str(data[1:3])dicttmp = {}dicttmp['handle'] = selfdicttmp['timestamp'] = time.time()self.factory.clients[self.id] = dicttmpprint "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)if(re.match('@\d{2}\D\w{4}\d{2}',data)):if(data[3] == 'D'):print u'接收到1条命令!\r\n'volttmp = (int(data[4:8],16)/1000.0)tmpstr = "当前电压值=%.3f" % volttmpprint tmpstrdicttmp = {}dicttmp['collecttime'] = getCurrentStrTime()dicttmp['volt'] = volttmpdicttmp['id'] = self.idr.set(self.id, json.dumps(dicttmp))print r.get(self.id)class GasServerFactory(Factory):def __init__(self):self.clients = {} # maps clients ids to GasServerProtocol instancesself._lc = task.LoopingCall(self.send_to_clients)self._lc.start(3,now=False)def buildProtocol(self, addr):return GasServerProtocol(self)def send_to_clients(self):for client_addr in self.clients :self.clients[client_addr]['handle'].sendLine('hello!')def startMoniter():print "[%s]启动监控服务" % getCurrentStrTime()cf = GasServerFactory()reactor.listenTCP(8234, cf)reactor.run()def stopMoniter():print "[%s]停止监控服务" % getCurrentStrTime()try:reactor.crash()except RuntimeError:return defer.fail()else:return defer.succeed(None)startMoniter()

# coding=utf-8from flask import render_template, request, redirectfrom flask import abort,jsonify,url_for,render_template, request, redirectfrom flask_cors import CORSimport jsonimport redisfrom flask import Flaskimport datetimefrom flask_cors import CORSapp = Flask(__name__)CORS(app)r = redis.Redis(host='127.0.0.1', port=6379, db=0)@app.route('/')def hello_world():return '你好!'@app.route('/voltage')def get_voltage():id = request.args.get('id', '')if id:json_data = r.get(id)if json_data:return jsonify(json.loads(json_data))return 'no data!'return 'no id!'if __name__ == '__main__':app.run(host='0.0.0.0', port=int("5000"))



# Host: localhost (Version: 5.5.47)# Date: 2017-12-08 13:55:35# Generator: MySQL-Front 5.3 (Build 4.234)/*!40101 SET NAMES utf8 */;## Structure for table "device"#CREATE TABLE `device` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`Id`),KEY `device_id` (`device_id`)) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;## Data for table "device"#INSERT INTO `device` VALUES (1,'12','科大气象站');## Structure for table "iot_history_data"#CREATE TABLE `iot_history_data` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`collect_time` datetime DEFAULT NULL,`temperature` float DEFAULT NULL,`humidity` float DEFAULT NULL,`voltage` float DEFAULT NULL,PRIMARY KEY (`Id`),UNIQUE KEY `device_id` (`device_id`,`collect_time`)) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;## Data for table "iot_history_data"### Structure for table "iot_realtime_data"#CREATE TABLE `iot_realtime_data` (`Id` int(11) NOT NULL AUTO_INCREMENT,`device_id` varchar(64) DEFAULT NULL,`collect_time` datetime DEFAULT NULL,`temperature` float DEFAULT NULL,`humidity` float DEFAULT NULL,`voltage` float DEFAULT NULL,PRIMARY KEY (`Id`)) ENGINE=MyISAM DEFAULT CHARSET=utf8;## Data for table "iot_realtime_data"#
-增加功能
在现有功能基础上,每5分钟将数据存入iot的iot_history_data表中
# coding=utf8#testNet.pyimport reimport timeimport jsonimport redisfrom datetime import datetimefrom twisted.internet import reactor, task, protocolfrom twisted.internet import defer, threadsfrom twisted.internet.protocol import Protocol, error, Factoryfrom twisted.protocols.basic import LineReceiverfrom dbhelper import insert_iot_recordr = redis.Redis(host='127.0.0.1',port=6379,db=0)def getCurrentStrTime():return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())class GasServerProtocol(LineReceiver):def __init__(self, factory):self.id = ''self.factory = factoryself.last_iot_time = datetime.now()self.fisrt_recv_flag = Truedef connectionMade(self):print("[%s]: %s连接到服务器端。" %(getCurrentStrTime(), self.transport.getPeer()))self.sendLine("$99'")def connectionLost(self, reason):if self.id in self.factory.clients:del self.factory.clients[self.id]print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)print("[%s]: %s断开了连接,原因是%s" %(getCurrentStrTime(),self.transport.getPeer(),reason))def insert_iot_record_cb(self, result):print resultself.last_iot_time = datetime.now()self.fisrt_recv_flag = Falsedef lineReceived(self, data):tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)print tmpstrif(re.match("\$\d{2}\$", data)):self.id = str(data[1:3])dicttmp = {}dicttmp['handle'] = selfdicttmp['timestamp'] = time.time()self.factory.clients[self.id] = dicttmpprint "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)if(re.match('@\d{2}\D\w{12}\d{2}',data)):if(data[3] == 'D'):print u'接收到1条命令!\r\n'volttmp = (int(data[4:8],16)/1000.0)temperature = (int(data[8:12],16)/10.0)humidity = (int(data[12:16],16)/10.0)dicttmp = {}dicttmp['collecttime'] = getCurrentStrTime()dicttmp['voltage'] = volttmpdicttmp['temperature'] = temperaturedicttmp['humidity'] = humiditydicttmp['id'] = self.idr.set(self.id, json.dumps(dicttmp))print r.get(self.id)current_time = datetime.now()if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):insert_iot_record( r.get(self.id)).addCallback(self.insert_iot_record_cb)class GasServerFactory(Factory):def __init__(self):self.clients = {} # maps clients ids to GasServerProtocol instancesself._lc = task.LoopingCall(self.send_to_clients)self._lc.start(3,now=False)def buildProtocol(self, addr):return GasServerProtocol(self)def send_to_clients(self):for client_addr in self.clients :self.clients[client_addr]['handle'].sendLine('hello!')def startMoniter():print "[%s]启动监控服务" % getCurrentStrTime()cf = GasServerFactory()reactor.listenTCP(8234, cf)reactor.run()def stopMoniter():print "[%s]停止监控服务" % getCurrentStrTime()try:reactor.crash()except RuntimeError:return defer.fail()else:return defer.succeed(None)startMoniter()
# coding=utf-8# dbhelper.pyfrom flask_sqlalchemy import SQLAlchemyfrom twisted.enterprise import adbapiimport binasciiimport decimalimport datetimeimport MySQLdbimport jsondb_conn = 'MySQLdb'db_user = 'root'db_pass = 'root'db_host = '127.0.0.1'db_database = 'iot'dbpool = adbapi.ConnectionPool(db_conn,host=db_host,user=db_user,passwd=db_pass,db=db_database,charset='utf8',cp_reconnect=True)def _insert_iot_record(trans, record):iot_dict = json.loads(record) # 根据字符串书写格式,将字符串自动转换成 字典类型print iot_dicttry:sql = 'insert into iot_history_data (device_id,voltage,humidity,temperature,collect_time)'+\' values(\'%s\',\'%6.2f\',\'%6.2f\',\'%6.2f\',\'%s\')' % (iot_dict['id'], iot_dict['voltage'], iot_dict['humidity'], iot_dict['temperature'], iot_dict['collecttime'])print sqlresult = trans.execute(sql)return 'True'except:return 'False'def insert_iot_record(record):return dbpool.runInteraction(_insert_iot_record, record)