[关闭]
@richey 2020-08-24T06:24:36.000000Z 字数 8082 阅读 1915

python讲义11:网络通信程序及数据库读写

python redis twisted

1.redis key/value数据库

1.1 启动redis服务(运行exe文件即可)

1.2 redis数据库讲解

1.3 要点讲解


  1. # coding=utf8
  2. import re
  3. import datetime
  4. import time
  5. import json
  6. import redis
  7. from twisted.internet import reactor, task, protocol
  8. from twisted.internet import defer, threads
  9. from twisted.internet.protocol import Protocol, error, Factory
  10. from twisted.protocols.basic import LineReceiver
  11. r = redis.Redis(host='127.0.0.1',port=6379,db=0)
  12. def getCurrentStrTime():
  13. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  14. class GasServerProtocol(LineReceiver):
  15. def __init__(self, factory):
  16. self.id = ''
  17. self.factory = factory
  18. def connectionMade(self):
  19. print(
  20. "[%s]: %s连接到服务器端。" %
  21. (getCurrentStrTime(), self.transport.getPeer()))
  22. self.sendLine("$99'")
  23. def connectionLost(self, reason):
  24. if self.id in self.factory.clients:
  25. del self.factory.clients[self.id]
  26. print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)
  27. print(
  28. "[%s]: %s断开了连接,原因是%s" %
  29. (getCurrentStrTime(),
  30. self.transport.getPeer(),
  31. reason))
  32. def lineReceived(self, data):
  33. tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
  34. print tmpstr
  35. if(re.match("\$\d{2}\$", data)):
  36. self.id = str(data[1:3])
  37. dicttmp = {}
  38. dicttmp['handle'] = self
  39. dicttmp['timestamp'] = time.time()
  40. self.factory.clients[self.id] = dicttmp
  41. print "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)
  42. if(re.match('@\d{2}\D\w{4}\d{2}',data)):
  43. if(data[3] == 'D'):
  44. print u'接收到1条命令!\r\n'
  45. volttmp = (int(data[4:8],16)/1000.0)
  46. tmpstr = "当前电压值=%.3f" % volttmp
  47. print tmpstr
  48. dicttmp = {}
  49. dicttmp['collecttime'] = getCurrentStrTime()
  50. dicttmp['volt'] = volttmp
  51. dicttmp['id'] = self.id
  52. r.set(self.id, json.dumps(dicttmp))
  53. print r.get(self.id)
  54. class GasServerFactory(Factory):
  55. def __init__(self):
  56. self.clients = {} # maps clients ids to GasServerProtocol instances
  57. self._lc = task.LoopingCall(self.send_to_clients)
  58. self._lc.start(3,now=False)
  59. def buildProtocol(self, addr):
  60. return GasServerProtocol(self)
  61. def send_to_clients(self):
  62. for client_addr in self.clients :
  63. self.clients[client_addr]['handle'].sendLine('hello!')
  64. def startMoniter():
  65. print "[%s]启动监控服务" % getCurrentStrTime()
  66. cf = GasServerFactory()
  67. reactor.listenTCP(8234, cf)
  68. reactor.run()
  69. def stopMoniter():
  70. print "[%s]停止监控服务" % getCurrentStrTime()
  71. try:
  72. reactor.crash()
  73. except RuntimeError:
  74. return defer.fail()
  75. else:
  76. return defer.succeed(None)
  77. startMoniter()

image_1butgll69iv91ega1ss81n1otp2p.png-21.6kB
image_1butgo936730t37dgg1en5gdj39.png-6.3kB
无标题.png-162.7kB
image_1butgg3mm5mn1gpcccu1esl1b0ps.png-42.1kB

2 RESTful API接口后端软件

  1. # coding=utf-8
  2. from flask import render_template, request, redirect
  3. from flask import abort,jsonify,url_for,render_template, request, redirect
  4. from flask_cors import CORS
  5. import json
  6. import redis
  7. from flask import Flask
  8. import datetime
  9. from flask_cors import CORS
  10. app = Flask(__name__)
  11. CORS(app)
  12. r = redis.Redis(host='127.0.0.1', port=6379, db=0)
  13. @app.route('/')
  14. def hello_world():
  15. return '你好!'
  16. @app.route('/voltage')
  17. def get_voltage():
  18. id = request.args.get('id', '')
  19. if id:
  20. json_data = r.get(id)
  21. if json_data:
  22. return jsonify(json.loads(json_data))
  23. return 'no data!'
  24. return 'no id!'
  25. if __name__ == '__main__':
  26. app.run(host='0.0.0.0', port=int("5000"))

111.png-198.9kB

3.mysql 关系数据库

3.1 新建数据库

3.2 新建表

  1. # Host: localhost (Version: 5.5.47)
  2. # Date: 2017-12-08 13:55:35
  3. # Generator: MySQL-Front 5.3 (Build 4.234)
  4. /*!40101 SET NAMES utf8 */;
  5. #
  6. # Structure for table "device"
  7. #
  8. CREATE TABLE `device` (
  9. `Id` int(11) NOT NULL AUTO_INCREMENT,
  10. `device_id` varchar(64) DEFAULT NULL,
  11. `name` varchar(255) DEFAULT NULL,
  12. PRIMARY KEY (`Id`),
  13. KEY `device_id` (`device_id`)
  14. ) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  15. #
  16. # Data for table "device"
  17. #
  18. INSERT INTO `device` VALUES (1,'12','科大气象站');
  19. #
  20. # Structure for table "iot_history_data"
  21. #
  22. CREATE TABLE `iot_history_data` (
  23. `Id` int(11) NOT NULL AUTO_INCREMENT,
  24. `device_id` varchar(64) DEFAULT NULL,
  25. `collect_time` datetime DEFAULT NULL,
  26. `temperature` float DEFAULT NULL,
  27. `humidity` float DEFAULT NULL,
  28. `voltage` float DEFAULT NULL,
  29. PRIMARY KEY (`Id`),
  30. UNIQUE KEY `device_id` (`device_id`,`collect_time`)
  31. ) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
  32. #
  33. # Data for table "iot_history_data"
  34. #
  35. #
  36. # Structure for table "iot_realtime_data"
  37. #
  38. CREATE TABLE `iot_realtime_data` (
  39. `Id` int(11) NOT NULL AUTO_INCREMENT,
  40. `device_id` varchar(64) DEFAULT NULL,
  41. `collect_time` datetime DEFAULT NULL,
  42. `temperature` float DEFAULT NULL,
  43. `humidity` float DEFAULT NULL,
  44. `voltage` float DEFAULT NULL,
  45. PRIMARY KEY (`Id`)
  46. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;
  47. #
  48. # Data for table "iot_realtime_data"
  49. #

3.3 操作mysql数据库

-增加功能
在现有功能基础上,每5分钟将数据存入iot的iot_history_data表中

  1. # coding=utf8
  2. #testNet.py
  3. import re
  4. import time
  5. import json
  6. import redis
  7. from datetime import datetime
  8. from twisted.internet import reactor, task, protocol
  9. from twisted.internet import defer, threads
  10. from twisted.internet.protocol import Protocol, error, Factory
  11. from twisted.protocols.basic import LineReceiver
  12. from dbhelper import insert_iot_record
  13. r = redis.Redis(host='127.0.0.1',port=6379,db=0)
  14. def getCurrentStrTime():
  15. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  16. class GasServerProtocol(LineReceiver):
  17. def __init__(self, factory):
  18. self.id = ''
  19. self.factory = factory
  20. self.last_iot_time = datetime.now()
  21. self.fisrt_recv_flag = True
  22. def connectionMade(self):
  23. print(
  24. "[%s]: %s连接到服务器端。" %
  25. (getCurrentStrTime(), self.transport.getPeer()))
  26. self.sendLine("$99'")
  27. def connectionLost(self, reason):
  28. if self.id in self.factory.clients:
  29. del self.factory.clients[self.id]
  30. print "[%s]:ID %s 下线" % (getCurrentStrTime(), self.id)
  31. print(
  32. "[%s]: %s断开了连接,原因是%s" %
  33. (getCurrentStrTime(),
  34. self.transport.getPeer(),
  35. reason))
  36. def insert_iot_record_cb(self, result):
  37. print result
  38. self.last_iot_time = datetime.now()
  39. self.fisrt_recv_flag = False
  40. def lineReceived(self, data):
  41. tmpstr = ("[%s]: 收到数据 %s") % (getCurrentStrTime(), data)
  42. print tmpstr
  43. if(re.match("\$\d{2}\$", data)):
  44. self.id = str(data[1:3])
  45. dicttmp = {}
  46. dicttmp['handle'] = self
  47. dicttmp['timestamp'] = time.time()
  48. self.factory.clients[self.id] = dicttmp
  49. print "[%s]:ID %s 注册成功" % (getCurrentStrTime(), self.id)
  50. if(re.match('@\d{2}\D\w{12}\d{2}',data)):
  51. if(data[3] == 'D'):
  52. print u'接收到1条命令!\r\n'
  53. volttmp = (int(data[4:8],16)/1000.0)
  54. temperature = (int(data[8:12],16)/10.0)
  55. humidity = (int(data[12:16],16)/10.0)
  56. dicttmp = {}
  57. dicttmp['collecttime'] = getCurrentStrTime()
  58. dicttmp['voltage'] = volttmp
  59. dicttmp['temperature'] = temperature
  60. dicttmp['humidity'] = humidity
  61. dicttmp['id'] = self.id
  62. r.set(self.id, json.dumps(dicttmp))
  63. print r.get(self.id)
  64. current_time = datetime.now()
  65. if((current_time - self.last_iot_time).seconds >= 5 * 60 or self.fisrt_recv_flag):
  66. insert_iot_record( r.get(self.id)).addCallback(
  67. self.insert_iot_record_cb)
  68. class GasServerFactory(Factory):
  69. def __init__(self):
  70. self.clients = {} # maps clients ids to GasServerProtocol instances
  71. self._lc = task.LoopingCall(self.send_to_clients)
  72. self._lc.start(3,now=False)
  73. def buildProtocol(self, addr):
  74. return GasServerProtocol(self)
  75. def send_to_clients(self):
  76. for client_addr in self.clients :
  77. self.clients[client_addr]['handle'].sendLine('hello!')
  78. def startMoniter():
  79. print "[%s]启动监控服务" % getCurrentStrTime()
  80. cf = GasServerFactory()
  81. reactor.listenTCP(8234, cf)
  82. reactor.run()
  83. def stopMoniter():
  84. print "[%s]停止监控服务" % getCurrentStrTime()
  85. try:
  86. reactor.crash()
  87. except RuntimeError:
  88. return defer.fail()
  89. else:
  90. return defer.succeed(None)
  91. startMoniter()
  1. # coding=utf-8
  2. # dbhelper.py
  3. from flask_sqlalchemy import SQLAlchemy
  4. from twisted.enterprise import adbapi
  5. import binascii
  6. import decimal
  7. import datetime
  8. import MySQLdb
  9. import json
  10. db_conn = 'MySQLdb'
  11. db_user = 'root'
  12. db_pass = 'root'
  13. db_host = '127.0.0.1'
  14. db_database = 'iot'
  15. dbpool = adbapi.ConnectionPool(
  16. db_conn,
  17. host=db_host,
  18. user=db_user,
  19. passwd=db_pass,
  20. db=db_database,
  21. charset='utf8',
  22. cp_reconnect=True)
  23. def _insert_iot_record(trans, record):
  24. iot_dict = json.loads(record) # 根据字符串书写格式,将字符串自动转换成 字典类型
  25. print iot_dict
  26. try:
  27. sql = 'insert into iot_history_data (device_id,voltage,humidity,temperature,collect_time)'+\
  28. ' values(\'%s\',\'%6.2f\',\'%6.2f\',\'%6.2f\',\'%s\')' % (
  29. iot_dict['id'], iot_dict['voltage'], iot_dict['humidity'], iot_dict['temperature'], iot_dict['collecttime'])
  30. print sql
  31. result = trans.execute(sql)
  32. return 'True'
  33. except:
  34. return 'False'
  35. def insert_iot_record(record):
  36. return dbpool.runInteraction(_insert_iot_record, record)
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注