[关闭]
@liruiyi962464 2025-05-15T01:14:17.000000Z 字数 8414 阅读 100

LTE新设备python文件

Linux服务器部署


不用管 测试用

  1. ps | grep python
  2. cd /
  3. vi serial_util.py
  4. python3 start.py
  5. rm *.py
  6. cat /etc/init.d/Start
  7. rm /etc/init.d/Start /etc/rc.d/S92Start
  8. vi /etc/init.d/Start
  9. chmod +x /etc/init.d/Start
  10. /etc/init.d/Start enable
  11. /etc/init.d/Start start
  12. /etc/init.d/Start stop
  13. /etc/init.d/Start stop
  14. rm /at.py
  15. vi /at.py
  16. /etc/init.d/Start stop
  17. /etc/init.d/Start start
  18. reboot

Start 配置自启服务

  1. #!/bin/sh /etc/rc.common
  2. USE_PROCD=1
  3. START=92
  4. STOP=15
  5. start_service() {
  6. procd_open_instance
  7. procd_set_param command python3 start.py
  8. procd_set_param respawn
  9. procd_set_param stdout 1
  10. procd_set_param stderr 1
  11. procd_close_instance
  12. }
  13. stop_service() {
  14. killall python3
  15. }
  16. restart() {
  17. stop
  18. start
  19. }

vi serial_util.py

  1. import serial
  2. import time
  3. import fcntl
  4. tty_com = "/dev/ttyUSB1"
  5. baudrate = 115200
  6. ip_addr = "172.0.0.11"
  7. deviceIp = '172.0.0.161'
  8. def open_serial():
  9. try:
  10. ser = serial.Serial(
  11. tty_com, baudrate,
  12. bytesize=serial.EIGHTBITS,
  13. parity=serial.PARITY_NONE,
  14. stopbits=serial.STOPBITS_ONE,
  15. timeout=3600
  16. )
  17. fcntl.flock(ser.fileno(),fcntl.LOCK_EX)
  18. return ser
  19. except Exception as e:
  20. print(f"fail to open ser:{str(e)}")
  21. return None
  22. def send_AT(ser, cmd):
  23. try:
  24. if not ser.is_open:
  25. ser.open()
  26. ser.write(cmd.encode())
  27. data = ser.read(1)
  28. time.sleep(0.1)
  29. data += ser.read(ser.inWaiting())
  30. return data.decode()
  31. except Exception as e:
  32. print(f"AT fail: {str(e)}")
  33. return "error"

vi api.py

  1. from flask import Flask, request
  2. import os,time
  3. import subprocess
  4. from serial_util import open_serial, send_AT,deviceIp
  5. app = Flask(__name__)
  6. app.secret_key = 'asdfghjkl'
  7. @app.route('/ping/<ip>/<num>', methods=['POST', 'GET'])
  8. def ping(ip, num):
  9. """test for ping"""
  10. try:
  11. result = subprocess.run(
  12. ['ping', '-c', num, ip],
  13. capture_output=True, text=True, timeout=10
  14. )
  15. return result.stdout.split('\n')
  16. except Exception as e:
  17. print(f"fail to Ping: {str(e)}")
  18. return "error"
  19. @app.route('/init', methods=['POST', 'GET'])
  20. def init():
  21. state = request.args.get('state', '')
  22. if not state.isdigit():
  23. return "Invalid state", 400
  24. try:
  25. os.system("kill $(pgrep -f 'python3 at.py')")
  26. ser = open_serial()
  27. if not ser:
  28. return "ser can not use", 500
  29. commands = {
  30. '0': "AT+CFUN=1\r\n",
  31. '1': "AT+CFUN=0\r\nAT+ZEMCI=0\r\n",
  32. '2': "AT+CFUN=1\r\n",
  33. '3': "AT+CGATT=0\r\nAT+CGATT=1\r\n",
  34. '4': "AT+CEREG=1\r\nAT+CGDCONT=1,\"IP\"\r\nAT+CGACT=1,1\r\nAT+ZGACT=1,1\r\n"
  35. }
  36. if state in commands:
  37. send_AT(ser, commands[state])
  38. time.sleep(1)
  39. return state
  40. except Exception as e:
  41. print(f"fail to init: {str(e)}")
  42. return "error", 500
  43. finally:
  44. os.system("nohup python3 at.py >> /dev/null 2>&1 &")
  45. if ser:
  46. ser.close()
  47. if __name__ == '__main__':
  48. app.run(host=deviceIp, port=3652, debug=True)

vi at.py

  1. import os
  2. import re
  3. import json
  4. import subprocess
  5. import requests
  6. import time
  7. from serial_util import open_serial, send_AT,ip_addr,deviceIp
  8. def zemci():
  9. try:
  10. ser = open_serial()
  11. if not ser:
  12. return "yizhuxiao"
  13. send_AT(ser, "AT+ZEMCI=1\r\n")
  14. time.sleep(1.2)
  15. datas = send_AT(ser, "AT+ZEMCI=1\r\n").split('\r\n')
  16. if len(datas) < 2 or 'ERROR' in datas[1] or datas[1] == 'OK':
  17. return "yizhuxiao"
  18. zemci_data = datas[1].split(':')[1].split(',')
  19. parameter = {
  20. "deviceIp": deviceIp,
  21. "cellId": hex(int(zemci_data[0]))[2:],
  22. "tac": zemci_data[2],
  23. "band1": zemci_data[6],
  24. "rsrp": int(zemci_data[10]) - 140,
  25. "rsrq": int(zemci_data[11]) * 0.5 - 20,
  26. "pci": zemci_data[14][:4],
  27. "earfcn": int(zemci_data[14][4:], 16),
  28. "csq": zemci_data[10]
  29. }
  30. print(parameter)
  31. return parameter
  32. except Exception as e:
  33. print(f"fail to get ipconfig: {str(e)}")
  34. return "error"
  35. finally:
  36. if ser:
  37. ser.close()
  38. def loop():
  39. data = zemci()
  40. if isinstance(data, dict) and data.get("deviceIp") == deviceIp:
  41. try:
  42. response = requests.post(
  43. f"http://{ip_addr}:8091/jeecg-boot/onlineStateDetection/deviceNetworkstatus/add",
  44. json=data,
  45. headers={"Content-Type": "application/json"},
  46. timeout=5
  47. )
  48. print(f"code:{response.status_code}")
  49. except Exception as e:
  50. print(f"fail to update:{str(e)}")
  51. return data
  52. def attach():
  53. ser = open_serial()
  54. send_AT(ser,"AT+ZGACT=1,1\r\n")
  55. time.sleep(1)
  56. ser.close()
  57. return 0
  58. def state():
  59. parameter = {}
  60. ser = open_serial()
  61. if not ser:
  62. return "yizhuxiao"
  63. send_AT(ser, "AT+ZEMCI=1\r\n")
  64. datas = send_AT(ser, "AT+CPIN?\r\n")
  65. card_status = datas.split('\r\n')[1]
  66. datas = send_AT(ser, "AT+CFUN?\r\n")
  67. fun = datas.split('\r\n')[1]
  68. if fun == '+CFUN: 0':
  69. moudelState = 0
  70. else:
  71. moudelState = 1
  72. if card_status == '+CME ERROR: 10':
  73. simState = 0
  74. else:
  75. simState = 1
  76. parameter["ip"] = deviceIp
  77. parameter['moudelState'] = moudelState
  78. parameter['simState'] = simState
  79. return parameter
  80. def cfun():
  81. ser = open_serial()
  82. send_AT(ser,"AT+CFUN=0\r\n")
  83. send_AT(ser,"AT+ZEMCI=0\r\n")
  84. time.sleep(0.5)
  85. send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")
  86. time.sleep(2)
  87. send_AT(ser,"AT+CGACT=1,1\r\n")
  88. print("dadsqweqweqfas")
  89. time.sleep(2)
  90. send_AT(ser,"AT+ZGACT=1,1\r\n")
  91. time.sleep(2)
  92. send_AT(ser,"AT+ZEMCI=1\r\n")
  93. send_AT(ser,"AT+CGPADDR=1\r\n")
  94. ser.close()
  95. return 0
  96. if __name__ == "__main__":
  97. num = 0
  98. cfun()
  99. while True:
  100. try:
  101. data = state()
  102. except Exception as e:
  103. print('error')
  104. result = os.system('ping -c 1 ' + ip_addr + ' >>/dev/null')
  105. print(result)
  106. if result:
  107. time.sleep(10)
  108. print('error1')
  109. else:
  110. try:
  111. if num == 0:
  112. num = 1
  113. loop()
  114. print(2)
  115. time.sleep(5)
  116. try:
  117. url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \
  118. data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])
  119. response = requests.get(url)
  120. except Exception:
  121. print('request error')
  122. elif num == 1:
  123. num = 0
  124. loop()
  125. print(200)
  126. time.sleep(5)
  127. try:
  128. url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \
  129. data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])
  130. response = requests.get(url)
  131. except Exception:
  132. print('request error')
  133. except KeyboardInterrupt:
  134. break

vi attach_api.py

  1. from flask import Flask, jsonify
  2. from serial_util import open_serial, send_AT,deviceIp
  3. import time
  4. app = Flask(__name__)
  5. def cfun():
  6. ser = open_serial()
  7. send_AT(ser,"AT+CFUN=0\r\n")
  8. send_AT(ser,"AT+ZEMCI=0\r\n")
  9. time.sleep(0.5)
  10. send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")
  11. time.sleep(2)
  12. send_AT(ser,"AT+CGACT=1,1\r\n")
  13. print("dadsqweqweqfas")
  14. time.sleep(2)
  15. send_AT(ser,"AT+ZGACT=1,1\r\n")
  16. time.sleep(2)
  17. send_AT(ser,"AT+ZEMCI=1\r\n")
  18. send_AT(ser,"AT+CGPADDR=1\r\n")
  19. ser.close()
  20. return 0
  21. @app.route('/attach/<num>', methods=['POST', 'GET'])
  22. def attach(num):
  23. if not num.isdigit():
  24. return jsonify({"error": "Invalid parameter"}), 500
  25. result = []
  26. try:
  27. ser = open_serial()
  28. if not ser:
  29. return jsonify({"error": "ser can not use"}), 500
  30. for _ in range(int(num)):
  31. send_AT(ser, "AT+CGATT=0\r\n")
  32. time.sleep(1)
  33. start_time = time.time()
  34. response = send_AT(ser, "AT+CGATT=1\r\n")
  35. datas=response.split('\r\n')[1]
  36. if datas != 'ok':
  37. result.append(int((time.time() - start_time) * 1000))
  38. else:
  39. result.append('error')
  40. return jsonify(result)
  41. except Exception as e:
  42. print(f"fail to attach: {str(e)}")
  43. return jsonify({"error": "Internal error"}), 500
  44. finally:
  45. if ser:
  46. ser.close()
  47. cfun()
  48. if __name__ == '__main__':
  49. app.run(host=deviceIp, port=3653, debug=True)

vi start.py

  1. import threading
  2. import os
  3. import sys
  4. import subprocess
  5. import logging
  6. from typing import Callable, Dict
  7. import time
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.FileHandler("service_runner.log"),
  13. logging.StreamHandler()
  14. ]
  15. )
  16. logger = logging.getLogger(__name__)
  17. class ServiceManager:
  18. def __init__(self):
  19. self.services = ["at.py", "api.py", "attach_api.py"]
  20. self.threads: Dict[str, threading.Thread] = {}
  21. self.processes: Dict[str, subprocess.Popen] = {}
  22. self.lock = threading.Lock()
  23. def run_script(self, script_name: str) -> None:
  24. try:
  25. logger.info(f"Starting {script_name}...")
  26. process = subprocess.Popen(
  27. [sys.executable, script_name],
  28. stdout=subprocess.PIPE,
  29. stderr=subprocess.STDOUT,
  30. text=True
  31. )
  32. with self.lock:
  33. self.processes[script_name] = process
  34. while True:
  35. output = process.stdout.readline() # type: ignore
  36. if output == '' and process.poll() is not None:
  37. break
  38. if output:
  39. logger.info(f"[{script_name}] {output.strip()}")
  40. exit_code = process.poll()
  41. if exit_code != 0:
  42. logger.error(f"{script_name} exited with code {exit_code}")
  43. except Exception as e:
  44. logger.error(f"Failed to start {script_name}: {str(e)}")
  45. finally:
  46. with self.lock:
  47. if script_name in self.processes:
  48. del self.processes[script_name]
  49. def create_daemon_thread(self, script_name: str) -> threading.Thread:
  50. thread = threading.Thread(target=lambda: self.run_script(script_name))
  51. thread.daemon = True
  52. return thread
  53. def restart_service(self, script_name: str):
  54. with self.lock:
  55. if script_name in self.threads and self.threads[script_name].is_alive():
  56. return
  57. logger.info(f"Restarting {script_name}...")
  58. thread = self.create_daemon_thread(script_name)
  59. thread.start()
  60. self.threads[script_name] = thread
  61. def monitor(self):
  62. try:
  63. while True:
  64. with self.lock:
  65. for script in self.services.copy():
  66. thread = self.threads.get(script)
  67. if not thread or not thread.is_alive():
  68. logger.warning(f"Service {script} died, restarting...")
  69. self.restart_service(script)
  70. time.sleep(5)
  71. except KeyboardInterrupt:
  72. logger.info("Received keyboard interrupt, shutting down...")
  73. sys.exit(0)
  74. if __name__ == "__main__":
  75. manager = ServiceManager()
  76. for script in manager.services:
  77. manager.restart_service(script)
  78. monitor_thread = threading.Thread(target=manager.monitor)
  79. monitor_thread.start()
  80. monitor_thread.join()
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注