[关闭]
@liruiyi962464 2025-12-12T08:34:05.000000Z 字数 10435 阅读 124

LTE新设备python文件

Linux服务器部署


不用管 测试用

  1. minicom -D /dev/ttyUSB0
  2. ps | grep python
  3. cd /
  4. vi serial_util.py
  5. python3 start.py
  6. rm *.py
  7. cat /etc/init.d/Start
  8. rm /etc/init.d/Start /etc/rc.d/S92Start
  9. vi /etc/init.d/Start
  10. chmod +x /etc/init.d/Start
  11. /etc/init.d/Start enable
  12. /etc/init.d/Start start
  13. /etc/init.d/Start stop
  14. /etc/init.d/Start stop
  15. rm /at.py
  16. vi /at.py
  17. /etc/init.d/Start stop
  18. /etc/init.d/Start start
  19. reboot

Start 配置自启服务

  1. #!/bin/sh /etc/rc.common
  2. USE_PROCD=1
  3. START=92
  4. STOP=15
  5. start_service() {
  6. procd_open_instance
  7. procd_set_param command python3 start.py
  8. procd_set_param respawn
  9. procd_set_param stdout 1
  10. procd_set_param stderr 1
  11. procd_close_instance
  12. }
  13. stop_service() {
  14. killall python3
  15. }
  16. restart() {
  17. stop
  18. start
  19. }

vi serial_util.py

  1. import serial
  2. import time
  3. import fcntl
  4. tty_com = "/dev/ttyUSB1"
  5. baudrate = 115200
  6. ip_addr = "172.0.0.11"
  7. deviceIp = '172.0.0.161'
  8. def open_serial():
  9. try:
  10. ser = serial.Serial(
  11. tty_com, baudrate,
  12. bytesize=serial.EIGHTBITS,
  13. parity=serial.PARITY_NONE,
  14. stopbits=serial.STOPBITS_ONE,
  15. timeout=3600
  16. )
  17. fcntl.flock(ser.fileno(),fcntl.LOCK_EX)
  18. return ser
  19. except Exception as e:
  20. print(f"fail to open ser:{str(e)}")
  21. return None
  22. def send_AT(ser, cmd):
  23. try:
  24. if not ser.is_open:
  25. ser.open()
  26. ser.write(cmd.encode())
  27. data = ser.read(1)
  28. time.sleep(0.1)
  29. data += ser.read(ser.inWaiting())
  30. return data.decode()
  31. except Exception as e:::
  32. print(f"AT fail: {str(e)}")
  33. return "error"

vi api.py

  1. from flask import Flask, request
  2. import os,time
  3. import subprocess
  4. from serial_util import open_serial, send_AT,deviceIp
  5. app = Flask(__name__)
  6. app.secret_key = 'asdfghjkl'
  7. @app.route('/ping/<ip>/<num>', methods=['POST', 'GET'])
  8. def ping(ip, num):
  9. """test for ping"""
  10. try:
  11. result = subprocess.run(
  12. ['ping', '-c', num, ip],
  13. capture_output=True, text=True, timeout=10
  14. )
  15. return result.stdout.split('\n')
  16. except Exception as e:
  17. print(f"fail to Ping: {str(e)}")
  18. return "error"
  19. @app.route('/init', methods=['POST', 'GET'])
  20. def init():
  21. state = request.args.get('state', '')
  22. if not state.isdigit():
  23. return "Invalid state", 400
  24. try:
  25. os.system("kill $(pgrep -f 'python3 at.py')")
  26. ser = open_serial()
  27. if not ser:
  28. return "ser can not use", 500
  29. commands = {
  30. '0': "AT+CFUN=1\r\n",
  31. '1': "AT+CFUN=0\r\nAT+ZEMCI=0\r\n",
  32. '2': "AT+CFUN=1\r\n",
  33. '3': "AT+CGATT=0\r\nAT+CGATT=1\r\n",
  34. '4': "AT+CEREG=1\r\nAT+CGDCONT=1,\"IP\"\r\nAT+CGACT=1,1\r\nAT+ZGACT=1,1\r\n"
  35. }
  36. if state in commands:
  37. send_AT(ser, commands[state])
  38. time.sleep(1)
  39. return state
  40. except Exception as e:
  41. print(f"fail to init: {str(e)}")
  42. return "error", 500
  43. finally:
  44. os.system("nohup python3 at.py >> /dev/null 2>&1 &")
  45. if ser:
  46. ser.close()
  47. if __name__ == '__main__':
  48. app.run(host=deviceIp, port=3652, debug=True)

vi at.py

  1. import os
  2. import re
  3. import json
  4. import subprocess
  5. import requests
  6. import time
  7. from serial_util import open_serial, send_AT, ip_addr, deviceIp
  8. def zemci():
  9. try:
  10. ser = open_serial()
  11. if not ser:
  12. return "no response"
  13. send_AT(ser, "AT+ZEMCI=1\r\n")
  14. time.sleep(1.2)
  15. datas = send_AT(ser, "AT+ZEMCI=1\r\n").split('\r\n')
  16. if len(datas) < 2 or 'ERROR' in datas[1] or datas[1] == 'OK':
  17. return "no response"
  18. zemci_data = datas[1].split(':')[1].split(',')
  19. parameter = {
  20. "deviceIp": deviceIp,
  21. "cellId": hex(int(zemci_data[0]))[2:],
  22. "tac": zemci_data[2],
  23. "band1": zemci_data[6],
  24. "rsrp": int(zemci_data[10]) - 140,
  25. "rsrq": int(zemci_data[11]) * 0.5 - 20,
  26. "pci": zemci_data[14][:4],
  27. "earfcn": int(zemci_data[14][4:], 16),
  28. "csq": zemci_data[10]
  29. }
  30. print("Cell info:", parameter)
  31. return parameter
  32. except Exception as e:
  33. print("fail to get zemci:", str(e))
  34. return "error"
  35. finally:
  36. if 'ser' in locals() and ser:
  37. ser.close()
  38. def loop():
  39. data = zemci()
  40. if isinstance(data, dict) and data.get("deviceIp") == deviceIp:
  41. try:
  42. response = requests.post(
  43. f"http://{ip_addr}:8091/jeecg-boot/onlineStateDetection/deviceNetworkstatus/add",
  44. json=data,
  45. headers={"Content-Type": "application/json"},
  46. timeout=5
  47. )
  48. print("Upload cell quality success, code:", response.status_code)
  49. except Exception as e:
  50. print("fail to update network status:", str(e))
  51. return data
  52. def attach():
  53. ser = open_serial()
  54. if ser:
  55. send_AT(ser, "AT+ZGACT=1,1\r\n")
  56. time.sleep(1)
  57. ser.close()
  58. return 0
  59. def state():
  60. ser = open_serial()
  61. if not ser:
  62. return {
  63. "ip": deviceIp,
  64. "deviceIp": deviceIp,
  65. "moduleResponding": 0,
  66. "moudelState": 0,
  67. "simState": 0
  68. }
  69. try:
  70. resp = send_AT(ser, "AT\r\n")
  71. module_responding = 1 if "OK" in resp else 0
  72. if not module_responding:
  73. return {
  74. "ip": deviceIp,
  75. "deviceIp": deviceIp,
  76. "moduleResponding": 0,
  77. "moudelState": 0,
  78. "simState": 0
  79. }
  80. resp = send_AT(ser, "AT+CFUN?\r\n")
  81. moudelState = 1 if "+CFUN: 1" in resp else 0
  82. resp = send_AT(ser, "AT+CPIN?\r\n")
  83. simState = 1 if "+CPIN: READY" in resp else 0
  84. return {
  85. "ip": deviceIp,
  86. "deviceIp": deviceIp,
  87. "moduleResponding": module_responding,
  88. "moudelState": moudelState,
  89. "simState": simState
  90. }
  91. except Exception as e:
  92. print("state error:", e)
  93. return {
  94. "ip": deviceIp,
  95. "deviceIp": deviceIp,
  96. "moduleResponding": 0,
  97. "moudelState": 0,
  98. "simState": 0
  99. }
  100. finally:
  101. if ser and ser.is_open:
  102. ser.close()
  103. def cfun():
  104. ser = None
  105. try:
  106. ser = open_serial()
  107. if not ser:
  108. print("Serial open failed")
  109. return -1
  110. print("Restarting module...")
  111. send_AT(ser, "AT+CFUN=0\r\n")
  112. time.sleep(2)
  113. send_AT(ser, "AT+ZEMCI=0\r\n")
  114. time.sleep(1)
  115. send_AT(ser, "AT+CFUN=1\r\n")
  116. time.sleep(5)
  117. send_AT(ser, "AT+CGACT=1,1\r\n")
  118. time.sleep(3)
  119. send_AT(ser, "AT+ZGACT=1,1\r\n")
  120. time.sleep(3)
  121. send_AT(ser, "AT+ZEMCI=1\r\n")
  122. send_AT(ser, "AT+CGPADDR=1\r\n")
  123. print("Module restart completed")
  124. return 0
  125. except Exception as e:
  126. print("cfun failed:", e)
  127. return -1
  128. finally:
  129. if ser:
  130. ser.close()
  131. def activate_radio():
  132. ser = None
  133. try:
  134. ser = open_serial()
  135. if not ser:
  136. print("Serial open failed, cannot activate radio")
  137. return -1
  138. print("Activating radio (AT+CFUN=1)...")
  139. send_AT(ser, "AT+CFUN=1\r\n")
  140. time.sleep(5)
  141. print("Radio activation sent, waiting for network registration...")
  142. return 0
  143. except Exception as e:
  144. print("Radio activation failed:", e)
  145. return -1
  146. finally:
  147. if ser:
  148. ser.close()
  149. if __name__ == "__main__":
  150. print("Initializing module...")
  151. cfun()
  152. consecutive_restarts = 0
  153. MAX_RESTARTS = 5
  154. COOLDOWN_TIME = 600
  155. while True:
  156. try:
  157. data = state()
  158. except Exception as e:
  159. print("Get state exception:", e)
  160. data = None
  161. ping_ok = os.system(f'ping -c 1 {ip_addr} > /dev/null 2>&1') == 0
  162. module_responding = data.get('moduleResponding', 0) if isinstance(data, dict) else 0
  163. module_ok = data.get('moudelState', 0) if isinstance(data, dict) else 0
  164. sim_ok = data.get('simState', 0) if isinstance(data, dict) else 0
  165. print("Status check: ping=", ping_ok, ", moduleResponding=", module_responding, ", module=", module_ok, ", sim=", sim_ok)
  166. if not ping_ok or not module_responding:
  167. print("Module unresponsive or network down, preparing to report...")
  168. death_data = {
  169. "ip": deviceIp,
  170. "deviceIp": deviceIp,
  171. "moudelState": 0,
  172. "simState": 0
  173. }
  174. try:
  175. url = f"http://{ip_addr}:8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip={death_data['ip']}&moudelState={death_data['moudelState']}&simState={death_data['simState']}"
  176. response = requests.get(url, timeout=5)
  177. print("Report module dead status success, code:", response.status_code)
  178. except Exception as e:
  179. print("Report dead status failed:", e)
  180. if consecutive_restarts >= MAX_RESTARTS:
  181. print("Max restarts reached, cooling down for", COOLDOWN_TIME//60, "minutes...")
  182. time.sleep(COOLDOWN_TIME)
  183. consecutive_restarts = 0
  184. continue
  185. print("Performing full restart... (attempt", consecutive_restarts + 1, ")")
  186. cfun()
  187. consecutive_restarts += 1
  188. time.sleep(15)
  189. continue
  190. if not module_ok:
  191. print("Radio off, activating radio...")
  192. activate_radio()
  193. time.sleep(8)
  194. continue
  195. try:
  196. if consecutive_restarts > 0:
  197. print("Device recovered, reset restart counter from", consecutive_restarts)
  198. consecutive_restarts = 0
  199. cell_data = loop()
  200. if isinstance(cell_data, str):
  201. print("Cell quality get failed:", cell_data)
  202. time.sleep(2)
  203. fallback_data = {
  204. "ip": deviceIp,
  205. "deviceIp": deviceIp,
  206. "moudelState": 1,
  207. "simState": 0
  208. }
  209. report_data = data if isinstance(data, dict) else fallback_data
  210. url = f"http://{ip_addr}:8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip={report_data['ip']}&moudelState={report_data['moudelState']}&simState={report_data['simState']}"
  211. response = requests.get(url, timeout=5)
  212. print("Report normal status success, code:", response.status_code)
  213. time.sleep(5)
  214. except Exception as e:
  215. print("Report process error:", e)
  216. time.sleep(5)

vi attach_api.py

  1. from flask import Flask, jsonify
  2. from serial_util import open_serial, send_AT,deviceIp
  3. import time
  4. app = Flask(__name__)
  5. def cfun():
  6. ser = open_serial()
  7. send_AT(ser,"AT+CFUN=0\r\n")
  8. send_AT(ser,"AT+ZEMCI=0\r\n")
  9. time.sleep(0.5)
  10. send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")
  11. time.sleep(2)
  12. send_AT(ser,"AT+CGACT=1,1\r\n")
  13. print("dadsqweqweqfas")
  14. time.sleep(2)
  15. send_AT(ser,"AT+ZGACT=1,1\r\n")
  16. time.sleep(2)
  17. send_AT(ser,"AT+ZEMCI=1\r\n")
  18. send_AT(ser,"AT+CGPADDR=1\r\n")
  19. ser.close()
  20. return 0
  21. @app.route('/attach/<num>', methods=['POST', 'GET'])
  22. def attach(num):
  23. if not num.isdigit():
  24. return jsonify({"error": "Invalid parameter"}), 500
  25. result = []
  26. try:
  27. ser = open_serial()
  28. if not ser:
  29. return jsonify({"error": "ser can not use"}), 500
  30. for _ in range(int(num)):
  31. send_AT(ser, "AT+CGATT=0\r\n")
  32. time.sleep(1)
  33. start_time = time.time()
  34. response = send_AT(ser, "AT+CGATT=1\r\n")
  35. datas=response.split('\r\n')[1]
  36. if datas != 'ok':
  37. result.append(int((time.time() - start_time) * 1000))
  38. else:
  39. result.append('error')
  40. return jsonify(result)
  41. except Exception as e:
  42. print(f"fail to attach: {str(e)}")
  43. return jsonify({"error": "Internal error"}), 500
  44. finally:
  45. if ser:
  46. ser.close()
  47. cfun()
  48. if __name__ == '__main__':
  49. app.run(host=deviceIp, port=3653, debug=True)

vi start.py

cd /
rm -rf service_runner.log
rm -rf start.py
vi start.py

  1. iimport threading
  2. import os
  3. import sys
  4. import subprocess
  5. from typing import Callable, Dict
  6. import time
  7. class ServiceManager:
  8. def __init__(self):
  9. self.services = ["at.py", "api.py", "attach_api.py"]
  10. self.threads: Dict[str, threading.Thread] = {}
  11. self.processes: Dict[str, subprocess.Popen] = {}
  12. self.lock = threading.Lock()
  13. def run_script(self, script_name: str) -> None:
  14. try:
  15. process = subprocess.Popen(
  16. [sys.executable, script_name],
  17. stdout=subprocess.PIPE,
  18. stderr=subprocess.STDOUT,
  19. text=True
  20. )
  21. with self.lock:
  22. self.processes[script_name] = process
  23. while True:
  24. output = process.stdout.readline() # type: ignore
  25. if output == '' and process.poll() is not None:
  26. break
  27. # if output:
  28. # print(f"[{script_name}] {output.strip()}")
  29. exit_code = process.poll()
  30. # if exit_code != 0:
  31. # print(f"Error: {script_name} exited with code {exit_code}")
  32. except Exception as e:
  33. # print(f"Failed to start {script_name}: {str(e)}")
  34. pass
  35. finally:
  36. with self.lock:
  37. if script_name in self.processes:
  38. del self.processes[script_name]
  39. def create_daemon_thread(self, script_name: str) -> threading.Thread:
  40. thread = threading.Thread(target=lambda: self.run_script(script_name))
  41. thread.daemon = True
  42. return thread
  43. def restart_service(self, script_name: str):
  44. with self.lock:
  45. if script_name in self.threads and self.threads[script_name].is_alive():
  46. return
  47. # print(f"Restarting {script_name}...")
  48. thread = self.create_daemon_thread(script_name)
  49. thread.start()
  50. self.threads[script_name] = thread
  51. def monitor(self):
  52. try:
  53. while True:
  54. with self.lock:
  55. for script in self.services.copy():
  56. thread = self.threads.get(script)
  57. if not thread or not thread.is_alive():
  58. # print(f"Warning: Service {script} died, restarting...")
  59. self.restart_service(script)
  60. time.sleep(5)
  61. except KeyboardInterrupt:
  62. # print("Received keyboard interrupt, shutting down...")
  63. sys.exit(0)
  64. if __name__ == "__main__":
  65. manager = ServiceManager()
  66. for script in manager.services:
  67. manager.restart_service(script)
  68. monitor_thread = threading.Thread(target=manager.monitor)
  69. monitor_thread.start()
  70. monitor_thread.join()

/etc/init.d/Start start
ps | grep python3

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注