@richey
2020-08-24T06:24:36.000000Z
字数 8082
阅读 1915
python
redis
twisted
我们将json格式的字符串作为value存入redis
# coding=utf8
import re
import datetime
import time
import json
import redis
from twisted.internet import reactor, task, protocol
from twisted.internet import defer, threads
from twisted.internet.protocol import Protocol, error, Factory
from twisted.protocols.basic import LineReceiver
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
def getCurrentStrTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
class GasServerProtocol(LineReceiver):
def __init__(self, factory):
self.id = ''
self.factory = factory
def connectionMade(self):
print(
"[%s]: %s连接到服务器端。" %
(getCurrentStrTime(), self.transport.getPeer()))
self.sendLine("$99'")
def connectionLost(self, reason):
if self.id in self.factory.clients:
del self.factory.clients[self.id]
print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)
print(
"[%s]: %s断开了连接,原因是%s" %
(getCurrentStrTime(),
self.transport.getPeer(),
reason))
def lineReceived(self, data):
tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
print tmpstr
if(re.match("\$\d{2}\$", data)):
self.id = str(data[1:3])
dicttmp = {}
dicttmp['handle'] = self
dicttmp['timestamp'] = time.time()
self.factory.clients[self.id] = dicttmp
print "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)
if(re.match('@\d{2}\D\w{4}\d{2}',data)):
if(data[3] == 'D'):
print u'接收到1条命令!\r\n'
volttmp = (int(data[4:8],16)/1000.0)
tmpstr = "当前电压值=%.3f" % volttmp
print tmpstr
dicttmp = {}
dicttmp['collecttime'] = getCurrentStrTime()
dicttmp['volt'] = volttmp
dicttmp['id'] = self.id
r.set(self.id, json.dumps(dicttmp))
print r.get(self.id)
class GasServerFactory(Factory):
def __init__(self):
self.clients = {} # maps clients ids to GasServerProtocol instances
self._lc = task.LoopingCall(self.send_to_clients)
self._lc.start(3,now=False)
def buildProtocol(self, addr):
return GasServerProtocol(self)
def send_to_clients(self):
for client_addr in self.clients :
self.clients[client_addr]['handle'].sendLine('hello!')
def startMoniter():
print "[%s]启动监控服务" % getCurrentStrTime()
cf = GasServerFactory()
reactor.listenTCP(8234, cf)
reactor.run()
def stopMoniter():
print "[%s]停止监控服务" % getCurrentStrTime()
try:
reactor.crash()
except RuntimeError:
return defer.fail()
else:
return defer.succeed(None)
startMoniter()
# coding=utf-8
from flask import render_template, request, redirect
from flask import abort,jsonify,url_for,render_template, request, redirect
from flask_cors import CORS
import json
import redis
from flask import Flask
import datetime
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
r = redis.Redis(host='127.0.0.1', port=6379, db=0)
@app.route('/')
def hello_world():
return '你好!'
@app.route('/voltage')
def get_voltage():
id = request.args.get('id', '')
if id:
json_data = r.get(id)
if json_data:
return jsonify(json.loads(json_data))
return 'no data!'
return 'no id!'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int("5000"))
# Host: localhost (Version: 5.5.47)
# Date: 2017-12-08 13:55:35
# Generator: MySQL-Front 5.3 (Build 4.234)
/*!40101 SET NAMES utf8 */;
#
# Structure for table "device"
#
CREATE TABLE `device` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`Id`),
KEY `device_id` (`device_id`)
) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#
# Data for table "device"
#
INSERT INTO `device` VALUES (1,'12','科大气象站');
#
# Structure for table "iot_history_data"
#
CREATE TABLE `iot_history_data` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`collect_time` datetime DEFAULT NULL,
`temperature` float DEFAULT NULL,
`humidity` float DEFAULT NULL,
`voltage` float DEFAULT NULL,
PRIMARY KEY (`Id`),
UNIQUE KEY `device_id` (`device_id`,`collect_time`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
#
# Data for table "iot_history_data"
#
#
# Structure for table "iot_realtime_data"
#
CREATE TABLE `iot_realtime_data` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`collect_time` datetime DEFAULT NULL,
`temperature` float DEFAULT NULL,
`humidity` float DEFAULT NULL,
`voltage` float DEFAULT NULL,
PRIMARY KEY (`Id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
#
# Data for table "iot_realtime_data"
#
-增加功能
在现有功能基础上,每5分钟将数据存入iot的iot_history_data表中
# coding=utf8
#testNet.py
import re
import time
import json
import redis
from datetime import datetime
from twisted.internet import reactor, task, protocol
from twisted.internet import defer, threads
from twisted.internet.protocol import Protocol, error, Factory
from twisted.protocols.basic import LineReceiver
from dbhelper import insert_iot_record
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
def getCurrentStrTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
class GasServerProtocol(LineReceiver):
def __init__(self, factory):
self.id = ''
self.factory = factory
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = True
def connectionMade(self):
print(
"[%s]: %s连接到服务器端。" %
(getCurrentStrTime(), self.transport.getPeer()))
self.sendLine("$99'")
def connectionLost(self, reason):
if self.id in self.factory.clients:
del self.factory.clients[self.id]
print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)
print(
"[%s]: %s断开了连接,原因是%s" %
(getCurrentStrTime(),
self.transport.getPeer(),
reason))
def insert_iot_record_cb(self, result):
print result
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = False
def lineReceived(self, data):
tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
print tmpstr
if(re.match("\$\d{2}\$", data)):
self.id = str(data[1:3])
dicttmp = {}
dicttmp['handle'] = self
dicttmp['timestamp'] = time.time()
self.factory.clients[self.id] = dicttmp
print "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)
if(re.match('@\d{2}\D\w{12}\d{2}',data)):
if(data[3] == 'D'):
print u'接收到1条命令!\r\n'
volttmp = (int(data[4:8],16)/1000.0)
temperature = (int(data[8:12],16)/10.0)
humidity = (int(data[12:16],16)/10.0)
dicttmp = {}
dicttmp['collecttime'] = getCurrentStrTime()
dicttmp['voltage'] = volttmp
dicttmp['temperature'] = temperature
dicttmp['humidity'] = humidity
dicttmp['id'] = self.id
r.set(self.id, json.dumps(dicttmp))
print r.get(self.id)
current_time = datetime.now()
if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):
insert_iot_record( r.get(self.id)).addCallback(
self.insert_iot_record_cb)
class GasServerFactory(Factory):
def __init__(self):
self.clients = {} # maps clients ids to GasServerProtocol instances
self._lc = task.LoopingCall(self.send_to_clients)
self._lc.start(3,now=False)
def buildProtocol(self, addr):
return GasServerProtocol(self)
def send_to_clients(self):
for client_addr in self.clients :
self.clients[client_addr]['handle'].sendLine('hello!')
def startMoniter():
print "[%s]启动监控服务" % getCurrentStrTime()
cf = GasServerFactory()
reactor.listenTCP(8234, cf)
reactor.run()
def stopMoniter():
print "[%s]停止监控服务" % getCurrentStrTime()
try:
reactor.crash()
except RuntimeError:
return defer.fail()
else:
return defer.succeed(None)
startMoniter()
# coding=utf-8
# dbhelper.py
from flask_sqlalchemy import SQLAlchemy
from twisted.enterprise import adbapi
import binascii
import decimal
import datetime
import MySQLdb
import json
db_conn = 'MySQLdb'
db_user = 'root'
db_pass = 'root'
db_host = '127.0.0.1'
db_database = 'iot'
dbpool = adbapi.ConnectionPool(
db_conn,
host=db_host,
user=db_user,
passwd=db_pass,
db=db_database,
charset='utf8',
cp_reconnect=True)
def _insert_iot_record(trans, record):
iot_dict = json.loads(record) # 根据字符串书写格式,将字符串自动转换成 字典类型
print iot_dict
try:
sql = 'insert into iot_history_data (device_id,voltage,humidity,temperature,collect_time)'+\
' values(\'%s\',\'%6.2f\',\'%6.2f\',\'%6.2f\',\'%s\')' % (
iot_dict['id'], iot_dict['voltage'], iot_dict['humidity'], iot_dict['temperature'], iot_dict['collecttime'])
print sql
result = trans.execute(sql)
return 'True'
except:
return 'False'
def insert_iot_record(record):
return dbpool.runInteraction(_insert_iot_record, record)