@richey
2021-09-30T03:14:46.000000Z
字数 24113
阅读 5327
物联网创新设计实践
讲义
如图,证明python3.7已经安装完成。
- 下载并安装pycharm 社区版
https://www.jetbrains.com/pycharm/download/#section=windows
- 最简单的python程序
# coding=utf8
print "hello world!\r\n"
def fun1():
print("你好!\r\n")
fun1()
-list列表
def fun1():
print ("你好!\r\n")
fun1()
l=[1,2,5,6,8]
print(l)
print (l[3])
for item in l:
print (item)
-元组
ython的元组与列表类似,不同之处在于元组的元素不能修改。元组使用小括号,列表使用方括号。
tup1 = ('physics', 'chemistry', 1997, 2000);
tup2 = (1, 2, 3, 4, 5 );
tup3 = "a", "b", "c", "d";
print (tup1)
print (tup2)
print (tup3)
-字典dict
bdict = {'name':'allen', 'age':'40'}
for key in bdict.keys():
print (key)
print (bdict[key])
参考:http://blog.csdn.net/hanhuili/article/details/9389433
Twisted是用Python实现的基于事件驱动的网络引擎框架。Twisted支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。Twisted对于其支持的所有协议都带有客户端和服务器实现,同时附带有基于命令行的工具,使得配置和部署产品级的Twisted应用变得非常方便。
Reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。Reactor可以感知网络、文件系统以及定时器事件。它等待然后处理这些事件,从特定于平台的行为中抽象出来,并提供统一的接口,使得在网络协议栈的任何位置对事件做出响应都变得简单。
关于reactor有下面几个结论:
Twisted的reactor只有通过调用reactor.run()来启动。
reactor循环是在其开始的进程中运行,也就是运行在主进程中。
一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。reactor循环并不会消耗任何CPU的资源。并不需要显式的创建reactor,只需要引入就OK了。
Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
makeConnection 在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
切确的说,它取名不太好,应该叫做FactoryOfProtocals,即协议工厂(也就是工厂模式),用来管理协议对象实例的。
# coding=utf8
import sys
import re
if sys.platform == 'win32':
from twisted.internet import win32eventreactor
win32eventreactor.install()
from twisted.internet import defer, threads
from twisted.internet.serialport import SerialPort
from twisted.internet import protocol,task, reactor, error
from twisted.protocols.basic import LineReceiver
from twisted.python import log, usage
class GasSim(LineReceiver):
def __init__(self):
self._lc1 = task.LoopingCall(self.sendCmd)
self._lc1.start(3,False)
def sendCmd(self):
self.sendLine("@01c100001".encode('utf-8'))
def lineReceived(self, line):
line = line.decode('utf-8')
print(line)
if(re.match('@\d{2}\D\w{4}\d{2}',line)):
if(line[3] == 'D'):
print (u'接收到1条命令!\r\n')
tmpstr = "当前电压值=%.3f" % (int(line[4:8],16)/1000.0)
print (tmpstr)
if __name__ == '__main__':
s = SerialPort(GasSim(), 'COM5', reactor, baudrate=9600)
reactor.run()
本小节已经合并到下一小节
安装redis,直接解压运行即可。
我们将json格式的字符串作为value存入redis
# coding: utf-8
import re
import datetime
import time
import json
import redis
from twisted.internet import reactor, task, protocol
from twisted.internet import defer, threads
from twisted.internet.protocol import Protocol, error, Factory
from twisted.protocols.basic import LineReceiver
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
def getCurrentStrTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
class GasServerProtocol(LineReceiver):
def __init__(self, factory):
self.id = ''
self.factory = factory
def connectionMade(self):
print((
"[%s]: %s连接到服务器端。" %
(getCurrentStrTime(), self.transport.getPeer())))
self.sendLine("$99'".encode('utf-8'))
def connectionLost(self, reason):
if self.id in self.factory.clients:
del self.factory.clients[self.id]
print("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))
print((
"[%s]: %s断开了连接,原因是%s" %
(getCurrentStrTime(),
self.transport.getPeer(),
reason)))
def lineReceived(self, data):
data = data.decode('utf-8')
tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
print(tmpstr)
if(re.match("\$\d{2}\$", data)):
self.id = str(data[1:3])
dicttmp = {}
dicttmp['handle'] = self
dicttmp['timestamp'] = time.time()
self.factory.clients[self.id] = dicttmp
print("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))
if(re.match('@\d{2}\D\w{4}\d{2}',data)):
if(data[3] == 'D'):
print('接收到1条命令!\r\n')
volttmp = (int(data[4:8],16)/1000.0)
tmpstr = "当前电压值=%.3f" % volttmp
print(tmpstr)
dicttmp = {}
dicttmp['collecttime'] = getCurrentStrTime()
dicttmp['volt'] = volttmp
dicttmp['id'] = self.id
r.set(self.id, json.dumps(dicttmp))
print(r.get(self.id))
class GasServerFactory(Factory):
def __init__(self):
self.clients = {} # maps clients ids to GasServerProtocol instances
self._lc = task.LoopingCall(self.send_to_clients)
self._lc.start(3,now=False)
def buildProtocol(self, addr):
return GasServerProtocol(self)
def send_to_clients(self):
for client_addr in self.clients :
self.clients[client_addr]['handle'].sendLine('hello!'.encode('utf-8'))
def startMoniter():
print("[%s]启动监控服务" % getCurrentStrTime())
cf = GasServerFactory()
reactor.listenTCP(8234, cf)
reactor.run()
def stopMoniter():
print("[%s]停止监控服务" % getCurrentStrTime())
try:
reactor.crash()
except RuntimeError:
return defer.fail()
else:
return defer.succeed(None)
startMoniter()
# coding=utf-8
from flask import render_template, request, redirect
from flask import abort,jsonify,url_for,render_template, request, redirect
from flask_cors import CORS
import json
import redis
from flask import Flask
import datetime
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
r = redis.Redis(host='127.0.0.1', port=6379, db=0)
@app.route('/')
def hello_world():
return '你好!'
@app.route('/voltage')
def get_voltage():
id = request.args.get('id', '')
if id:
json_data = r.get(id)
if json_data:
return jsonify(json.loads(json_data))
return 'no data!'
return 'no id!'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int("5000"))
在iot数据库中,新建device、iot_realtime_data,iot_history_data,共三张表。
或执行如下脚本创建三张表
CREATE TABLE `device` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`Id`),
KEY `device_id` (`device_id`)
) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
CREATE TABLE `iot_history_data` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`collect_time` datetime DEFAULT NULL,
`temperature` float DEFAULT NULL,
`humidity` float DEFAULT NULL,
`voltage` float DEFAULT NULL,
PRIMARY KEY (`Id`),
UNIQUE KEY `device_id` (`device_id`,`collect_time`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
#
CREATE TABLE `iot_realtime_data` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(64) DEFAULT NULL,
`collect_time` datetime DEFAULT NULL,
`temperature` float DEFAULT NULL,
`humidity` float DEFAULT NULL,
`voltage` float DEFAULT NULL,
PRIMARY KEY (`Id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
CREATE TABLE `position` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`device_id` varchar(255) DEFAULT NULL,
`longitude` varchar(255) DEFAULT NULL,
`latitude` varchar(255) DEFAULT NULL,
`collect_time` datetime DEFAULT NULL,
PRIMARY KEY (`Id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
-增加功能
在现有功能基础上,每5分钟将数据存入iot的iot_history_data表中
# coding=utf8
#testNet.py
import re
import time
import json
import redis
from datetime import datetime
from twisted.internet import reactor, task, protocol
from twisted.internet import defer, threads
from twisted.internet.protocol import Protocol, error, Factory
from twisted.protocols.basic import LineReceiver
from dbhelper import insert_iot_record
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
def getCurrentStrTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
class GasServerProtocol(LineReceiver):
def __init__(self, factory):
self.id = ''
self.factory = factory
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = True
def connectionMade(self):
print(
"[%s]: %s连接到服务器端。" %
(getCurrentStrTime(), self.transport.getPeer()))
self.sendLine("$99'".encode('utf-8'))
def connectionLost(self, reason):
if self.id in self.factory.clients:
del self.factory.clients[self.id]
print ("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))
print(
"[%s]: %s断开了连接,原因是%s" %
(getCurrentStrTime(),
self.transport.getPeer(),
reason))
def insert_iot_record_cb(self, result):
print (result)
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = False
def lineReceived(self, data):
data = data.decode('utf-8')
tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
print (tmpstr)
if(re.match("\$\d{2}\$", data)):
self.id = str(data[1:3])
dicttmp = {}
dicttmp['handle'] = self
dicttmp['timestamp'] = time.time()
self.factory.clients[self.id] = dicttmp
print ("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))
if(re.match('@\d{2}\D\w{12}\d{2}',data)):
if(data[3] == 'D'):
print (u'接收到1条命令!\r\n')
volttmp = (int(data[4:8],16)/1000.0)
temperature = (int(data[8:12],16)/10.0)
humidity = (int(data[12:16],16)/10.0)
dicttmp = {}
dicttmp['collecttime'] = getCurrentStrTime()
dicttmp['voltage'] = volttmp
dicttmp['temperature'] = temperature
dicttmp['humidity'] = humidity
dicttmp['id'] = self.id
r.set(self.id, json.dumps(dicttmp))
print (r.get(self.id))
current_time = datetime.now()
if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):
insert_iot_record( r.get(self.id)).addCallback(
self.insert_iot_record_cb)
class GasServerFactory(Factory):
def __init__(self):
self.clients = {} # maps clients ids to GasServerProtocol instances
self._lc = task.LoopingCall(self.send_to_clients)
self._lc.start(3,now=False)
def buildProtocol(self, addr):
return GasServerProtocol(self)
def send_to_clients(self):
for client_addr in self.clients :
self.clients[client_addr]['handle'].sendLine('hello!'.encode('utf-8'))
def startMoniter():
print ("[%s]启动监控服务" % getCurrentStrTime())
cf = GasServerFactory()
reactor.listenTCP(8235, cf)
reactor.run()
def stopMoniter():
print ("[%s]停止监控服务" % getCurrentStrTime())
try:
reactor.crash()
except RuntimeError:
return defer.fail()
else:
return defer.succeed(None)
startMoniter()
# coding=utf-8
# dbhelper.py
from flask_sqlalchemy import SQLAlchemy
from twisted.enterprise import adbapi
import binascii
import decimal
import datetime
import MySQLdb
import json
db_conn = 'MySQLdb'
db_user = 'root'
db_pass = 'root'
db_host = '127.0.0.1'
db_database = 'iot'
dbpool = adbapi.ConnectionPool(
db_conn,
host=db_host,
user=db_user,
passwd=db_pass,
db=db_database,
charset='utf8',
cp_reconnect=True)
def _insert_iot_record(trans, record):
iot_dict = json.loads(record) # 根据字符串书写格式,将字符串自动转换成 字典类型
print (iot_dict)
try:
sql = 'insert into iot_history_data (device_id,voltage,humidity,temperature,collect_time)'+\
' values(\'%s\',\'%6.2f\',\'%6.2f\',\'%6.2f\',\'%s\')' % (
iot_dict['id'], iot_dict['voltage'], iot_dict['humidity'], iot_dict['temperature'], iot_dict['collecttime'])
print (sql)
result = trans.execute(sql)
return 'True'
except:
return 'False'
def insert_iot_record(record):
return dbpool.runInteraction(_insert_iot_record, record)
用sublime打开工程文件夹,讲解文件结构
// The Vue build version to load with the `import` command
// (runtime-only or standalone) has been set in webpack.base.conf with an alias.
import Vue from 'vue'
import FastClick from 'fastclick'
import VueRouter from 'vue-router'
import App from './App'
import Home from './components/HelloFromVux'
import { AjaxPlugin } from 'vux'
Vue.use(AjaxPlugin)
//this.$http.defaults.baseURL = '127.0.0.1:5000';
Vue.use(VueRouter)
const routes = [{
path: '/',
component: Home
}]
const router = new VueRouter({
routes
})
FastClick.attach(document.body)
Vue.config.productionTip = false
/* eslint-disable no-new */
new Vue({
router,
render: h => h(App)
}).$mount('#app-box')
<template>
<div>
<x-header class="header" :left-options="{showBack: false}">
物联网应用系统
</x-header>
<divider>
实时数据
</divider>
<grid>
<grid-item >
<ul class="grid">
<li
<span class="grid-iot-icon" slot="icon" ></span>
</li>
<li class = "grid-text1">气温(℃)</li>
<li class = "grid-text2"> {{my_data.temperature}}</li>
</ul>
</grid-item>
<grid-item >
<ul class="grid">
<li
<span class="grid-iot-icon" slot="icon" ></span>
</li>
<li class = "grid-text1">湿度(%)</li>
<li class = "grid-text2"> {{my_data.humidity}}</li>
</ul>
</grid-item>
<grid-item >
<ul class="grid">
<li
<span class="grid-iot-icon" slot="icon" ></span>
</li>
<li class = "grid-text1">电压(V)</li>
<li class = "grid-text2"> {{my_data.voltage}}</li>
</ul>
</grid-item>
</grid>
</div>
</template>
<script>
import { Group, Cell,Divider,XHeader,Flexbox,FlexboxItem,Grid, GridItem } from 'vux'
export default {
components: {
Group,
Cell,
Divider,
XHeader,
Flexbox,
FlexboxItem,
Grid,
GridItem
},
data () {
return {
// note: changing this line won't causes changes
// with hot-reload because the reloaded component
// preserves its current state and we are modifying
// its initial state.
msg: 'Hello World!',
my_data:[],
}
},
mounted() {
this.getData();
this.timerGetData = setInterval(this.getData, 3000);
},
beforeDestroy() {
clearInterval(this.timerGetData)
},
methods: {
getData() {
this.$http({
url: 'http://127.0.0.1:5000/voltage?id=12',
method: 'GET',
})
.then((res) => {
this.my_data = res.data;
console.log('success ');
}, (res) => {
console.log('error ');
});
},
}
}
</script>
<style>
.vux-demo {
text-align: center;
}
.logo {
width: 100px;
height: 100px
}
.grid-iot-icon {
font-family: 'iot';
font-size: 32px;
color: @theme-color;
text-align: center;
}
.grid-text1 {
text-align: center;
color: grey;
list-style-type:none;
font-size: 18px;
}
.grid-text2 {
text-align: center;
color: black;
list-style-type:none;
font-size: 22px;
}
.grid {
font-size: 22px;
text-align: center;
background-color: white;
list-style-type:none;
}
.header {
background-color: @theme-color;
}
</style>
<template>
<div>
<tabbar style="position:fixed" slot="bottom">
<tabbar-item link="/">
<span class="iot-icon-title" slot="icon"></span>
<span class="iot-text-title" slot="label">首页</span>
</tabbar-item>
<tabbar-item link="/map">
<span class="iot-icon-title" slot="icon"></span>
<span class="iot-text-title" slot="label">监控</span>
</tabbar-item>
<tabbar-item link="home">
<span class="iot-icon-title1" slot="icon"></span>
<span class="iot-text-title" slot="label">查询</span>
</tabbar-item>
<tabbar-item >
<span class="iot-icon-title" slot="icon"></span>
<span class="iot-text-title" slot="label">设置</span>
</tabbar-item>
</tabbar>
</div>
</template>
<script>
import { Tabbar, TabbarItem } from 'vux'
export default {
ready () {
document.querySelector('body').style.height = '100%'
document.querySelector('html').style.height = '100%'
document.querySelector('#app').style.height = '100%'
},
components: {
Tabbar,
TabbarItem
}
}
</script>
<style lang="less">
.iot-tabbar {
position: fixed;
}
.iot-icon-title {
font-family: 'iot';
font-size: 32px;
line-height: 30px;
}
.iot-icon-title1 {
font-family: 'iot';
font-size: 28px;
line-height: 32px;
}
.iot-text-title {
font-size: 14px;
line-height: 28px;
}
</style>
<template>
<div>
<div style="height: 10%; overflow: auto;">
<h3>Simple map</h3>
Marker is placed at {{ marker.lat }}, {{ marker.lng }}
</div>
<v-map style="height: 90%" :zoom="zoom" :center="center">
<v-tilelayer :url="url" :attribution="attribution"></v-tilelayer>
<v-marker :lat-lng="marker"></v-marker>
</v-map>
</div>
</template>
<script>
import Vue2Leaflet from 'vue2-leaflet';
export default {
components: {
'v-map': Vue2Leaflet.Map,
'v-tilelayer' :Vue2Leaflet.TileLayer,
'v-marker': Vue2Leaflet.Marker
},
data () {
return {
zoom:13,
center: L.latLng(47.413220, -1.219482),
url:'http://{s}.tile.osm.org/{z}/{x}/{y}.png',
attribution:'© <a href="http://osm.org/copyright">OpenStreetMap</a> contributors',
marker: L.latLng(47.413220, -1.219482),
}
}
}
</script>
// The Vue build version to load with the `import` command
// (runtime-only or standalone) has been set in webpack.base.conf with an alias.
import Vue from 'vue'
import FastClick from 'fastclick'
import VueRouter from 'vue-router'
import App from './App'
import Home from './components/HelloFromVux'
import Hello from './components/Hello'
import Map from './components/BMap'
import BaiduMap from 'vue-baidu-map'
Vue.use(BaiduMap, {
// ak 是在百度地图开发者平台申请的密钥 详见 http://lbsyun.baidu.com/apiconsole/key */
ak: 'YOUR_APP_KEY'
})
import { AjaxPlugin } from 'vux'
Vue.use(AjaxPlugin)
//this.$http.defaults.baseURL = '127.0.0.1:5000';
Vue.use(VueRouter)
const routes = [{
path: '/',
name: 'default',
component: Home
},{
path: '/map',
component: Map
},{
path: '*',
redirect: { name: 'default' },
}
]
const router = new VueRouter({
hashbang: false,
linkActiveClass: 'active',
mode: 'history',
base: __dirname,
routes
})
FastClick.attach(document.body)
/* eslint-disable no-new */
new Vue({
router,
render: h => h(App)
}).$mount('#app-box')
getData() {
this.$http({
url: 'http://192.168.0.120:5000/voltage',
method: 'GET',
params: {
id: "12"
},
#testFlask.py
# coding=utf-8
from flask import render_template, request, redirect
from flask import abort,jsonify,url_for,render_template, request, redirect
from flask_cors import CORS, cross_origin
from flask_restless import APIManager
import json
import redis
from flask import Flask
import datetime
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from database import db
from application import app
from models import IotHistoryData,Position
r = redis.Redis(host='127.0.0.1', port=6379, db=0)
apimanager = APIManager(app, flask_sqlalchemy_db=db)
apimanager.create_api(IotHistoryData,
methods=['GET'],
# url_prefix='/api/v1',
# preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),
results_per_page=100,
url_prefix='',
collection_name='history',)
apimanager.create_api(Position,
methods=['GET','PUT','PATCH','POST'],
# url_prefix='/api/v1',
# preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),
results_per_page=100,
url_prefix='',
include_columns = ['Id', 'latitude', 'longitude','collect_time'],
collection_name='position',)
@app.route('/')
def hello_world():
return '你好!'
@app.route('/voltage')
def get_voltage():
id = request.args.get('id', '')
if id:
json_data = r.get(id)
if json_data:
return jsonify(json.loads(json_data))
return 'no data!'
return 'no id!'
@app.route('/relay', methods=['POST'])
def set_relay():
json_data = request.get_data()
py_data = json.loads(json_data)
r.lpush('front_tcp', (json.dumps(py_data)).encode('utf-8'))
return jsonify(py_data)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=int("5000"))
#testNetRedis.py
# coding=utf8
import re
import time
import json
import redis
from datetime import datetime
from twisted.internet import reactor, task, protocol
from twisted.internet import defer, threads
from twisted.internet.protocol import Protocol, error, Factory
from twisted.protocols.basic import LineReceiver
from dbhelper import insert_iot_record
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
def getCurrentStrTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
class GasServerProtocol(LineReceiver):
def __init__(self, factory):
self.id = ''
self.factory = factory
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = True
def connectionMade(self):
print(
"[%s]: %s连接到服务器端。" %
(getCurrentStrTime(), self.transport.getPeer()))
self.sendLine("$99'".encode('utf-8'))
def connectionLost(self, reason):
if self.id in self.factory.clients:
del self.factory.clients[self.id]
print("[%s]:ID %s 下线" % (getCurrentStrTime(), self.id))
print(
"[%s]: %s断开了连接,原因是%s" %
(getCurrentStrTime(),
self.transport.getPeer(),
reason))
def insert_iot_record_cb(self, result):
print(result)
self.last_iot_time = datetime.now()
self.fisrt_recv_flag = False
def lineReceived(self, data):
data = data.decode('utf-8')
tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
print(tmpstr)
if(re.match("\$\d{2}\$", data)):
self.id = str(data[1:3])
dicttmp = {}
dicttmp['handle'] = self
dicttmp['timestamp'] = time.time()
self.factory.clients[self.id] = dicttmp
print("[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id))
if(re.match('@\d{2}\D\w{12}\d{2}',data)):
if(data[3] == 'D'):
print(u'接收到1条命令!\r\n')
volttmp = (int(data[4:8],16)/1000.0)
temperature = (int(data[8:12],16)/10.0)
humidity = (int(data[12:16],16)/10.0)
dicttmp = {}
dicttmp['collecttime'] = getCurrentStrTime()
dicttmp['voltage'] = volttmp
dicttmp['temperature'] = temperature
dicttmp['humidity'] = humidity
dicttmp['id'] = self.id
r.set(self.id, json.dumps(dicttmp))
print(r.get(self.id))
current_time = datetime.now()
if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):
insert_iot_record( r.get(self.id)).addCallback(
self.insert_iot_record_cb)
class GasServerFactory(Factory):
def __init__(self):
self.clients = {} # maps clients ids to GasServerProtocol instances
self._lc = task.LoopingCall(self.send_to_clients)
self._lc.start(60,now=False)
def buildProtocol(self, addr):
return GasServerProtocol(self)
def send_to_clients(self):
for client_addr in self.clients :
self.clients[client_addr]['handle'].sendLine('@12d000000'.encode('utf-8'))
@defer.inlineCallbacks
def receive_from_mq(self):
try:
data = yield r.rpop('front_tcp')
if data and data != '':
print(u'接收到1条命令!%s\r\n' % (data))
self.process_data_from_mq(data)
except:
pass
def process_data_from_mq(self, data):
loads_data = (json.loads(data))
command = loads_data.get('command')
addr = (loads_data.get('addr'))
mydata = loads_data.get('data')
send_data = '@%s%s%s01' % (addr,command,mydata)
send_data = send_data.encode('utf-8')
self.clients[addr]['handle'].sendLine(send_data)
def startMoniter():
print("[%s]启动监控服务" % getCurrentStrTime())
cf = GasServerFactory()
task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)
reactor.listenTCP(8234, cf)
reactor.run()
def stopMoniter():
print("[%s]停止监控服务" % getCurrentStrTime())
try:
reactor.crash()
except RuntimeError:
return defer.fail()
else:
return defer.succeed(None)
startMoniter()