@liruiyi962464
2025-12-12T08:34:05.000000Z
字数 10435
阅读 124
Linux服务器部署
minicom -D /dev/ttyUSB0ps | grep pythoncd /vi serial_util.pypython3 start.pyrm *.pycat /etc/init.d/Startrm /etc/init.d/Start /etc/rc.d/S92Startvi /etc/init.d/Startchmod +x /etc/init.d/Start/etc/init.d/Start enable/etc/init.d/Start start/etc/init.d/Start stop/etc/init.d/Start stoprm /at.pyvi /at.py/etc/init.d/Start stop/etc/init.d/Start startreboot
#!/bin/sh /etc/rc.commonUSE_PROCD=1START=92STOP=15start_service() {procd_open_instanceprocd_set_param command python3 start.pyprocd_set_param respawnprocd_set_param stdout 1procd_set_param stderr 1procd_close_instance}stop_service() {killall python3}restart() {stopstart}
import serialimport timeimport fcntltty_com = "/dev/ttyUSB1"baudrate = 115200ip_addr = "172.0.0.11"deviceIp = '172.0.0.161'def open_serial():try:ser = serial.Serial(tty_com, baudrate,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=3600)fcntl.flock(ser.fileno(),fcntl.LOCK_EX)return serexcept Exception as e:print(f"fail to open ser:{str(e)}")return Nonedef send_AT(ser, cmd):try:if not ser.is_open:ser.open()ser.write(cmd.encode())data = ser.read(1)time.sleep(0.1)data += ser.read(ser.inWaiting())return data.decode()except Exception as e:::print(f"AT fail: {str(e)}")return "error"
from flask import Flask, requestimport os,timeimport subprocessfrom serial_util import open_serial, send_AT,deviceIpapp = Flask(__name__)app.secret_key = 'asdfghjkl'@app.route('/ping/<ip>/<num>', methods=['POST', 'GET'])def ping(ip, num):"""test for ping"""try:result = subprocess.run(['ping', '-c', num, ip],capture_output=True, text=True, timeout=10)return result.stdout.split('\n')except Exception as e:print(f"fail to Ping: {str(e)}")return "error"@app.route('/init', methods=['POST', 'GET'])def init():state = request.args.get('state', '')if not state.isdigit():return "Invalid state", 400try:os.system("kill $(pgrep -f 'python3 at.py')")ser = open_serial()if not ser:return "ser can not use", 500commands = {'0': "AT+CFUN=1\r\n",'1': "AT+CFUN=0\r\nAT+ZEMCI=0\r\n",'2': "AT+CFUN=1\r\n",'3': "AT+CGATT=0\r\nAT+CGATT=1\r\n",'4': "AT+CEREG=1\r\nAT+CGDCONT=1,\"IP\"\r\nAT+CGACT=1,1\r\nAT+ZGACT=1,1\r\n"}if state in commands:send_AT(ser, commands[state])time.sleep(1)return stateexcept Exception as e:print(f"fail to init: {str(e)}")return "error", 500finally:os.system("nohup python3 at.py >> /dev/null 2>&1 &")if ser:ser.close()if __name__ == '__main__':app.run(host=deviceIp, port=3652, debug=True)
import osimport reimport jsonimport subprocessimport requestsimport timefrom serial_util import open_serial, send_AT, ip_addr, deviceIpdef zemci():try:ser = open_serial()if not ser:return "no response"send_AT(ser, "AT+ZEMCI=1\r\n")time.sleep(1.2)datas = send_AT(ser, "AT+ZEMCI=1\r\n").split('\r\n')if len(datas) < 2 or 'ERROR' in datas[1] or datas[1] == 'OK':return "no response"zemci_data = datas[1].split(':')[1].split(',')parameter = {"deviceIp": deviceIp,"cellId": hex(int(zemci_data[0]))[2:],"tac": zemci_data[2],"band1": zemci_data[6],"rsrp": int(zemci_data[10]) - 140,"rsrq": int(zemci_data[11]) * 0.5 - 20,"pci": zemci_data[14][:4],"earfcn": int(zemci_data[14][4:], 16),"csq": zemci_data[10]}print("Cell info:", parameter)return parameterexcept Exception as e:print("fail to get zemci:", str(e))return "error"finally:if 'ser' in locals() and ser:ser.close()def loop():data = zemci()if isinstance(data, dict) and data.get("deviceIp") == deviceIp:try:response = requests.post(f"http://{ip_addr}:8091/jeecg-boot/onlineStateDetection/deviceNetworkstatus/add",json=data,headers={"Content-Type": "application/json"},timeout=5)print("Upload cell quality success, code:", response.status_code)except Exception as e:print("fail to update network status:", str(e))return datadef attach():ser = open_serial()if ser:send_AT(ser, "AT+ZGACT=1,1\r\n")time.sleep(1)ser.close()return 0def state():ser = open_serial()if not ser:return {"ip": deviceIp,"deviceIp": deviceIp,"moduleResponding": 0,"moudelState": 0,"simState": 0}try:resp = send_AT(ser, "AT\r\n")module_responding = 1 if "OK" in resp else 0if not module_responding:return {"ip": deviceIp,"deviceIp": deviceIp,"moduleResponding": 0,"moudelState": 0,"simState": 0}resp = send_AT(ser, "AT+CFUN?\r\n")moudelState = 1 if "+CFUN: 1" in resp else 0resp = send_AT(ser, "AT+CPIN?\r\n")simState = 1 if "+CPIN: READY" in resp else 0return {"ip": deviceIp,"deviceIp": deviceIp,"moduleResponding": module_responding,"moudelState": moudelState,"simState": simState}except Exception as e:print("state error:", e)return {"ip": deviceIp,"deviceIp": deviceIp,"moduleResponding": 0,"moudelState": 0,"simState": 0}finally:if ser and ser.is_open:ser.close()def cfun():ser = Nonetry:ser = open_serial()if not ser:print("Serial open failed")return -1print("Restarting module...")send_AT(ser, "AT+CFUN=0\r\n")time.sleep(2)send_AT(ser, "AT+ZEMCI=0\r\n")time.sleep(1)send_AT(ser, "AT+CFUN=1\r\n")time.sleep(5)send_AT(ser, "AT+CGACT=1,1\r\n")time.sleep(3)send_AT(ser, "AT+ZGACT=1,1\r\n")time.sleep(3)send_AT(ser, "AT+ZEMCI=1\r\n")send_AT(ser, "AT+CGPADDR=1\r\n")print("Module restart completed")return 0except Exception as e:print("cfun failed:", e)return -1finally:if ser:ser.close()def activate_radio():ser = Nonetry:ser = open_serial()if not ser:print("Serial open failed, cannot activate radio")return -1print("Activating radio (AT+CFUN=1)...")send_AT(ser, "AT+CFUN=1\r\n")time.sleep(5)print("Radio activation sent, waiting for network registration...")return 0except Exception as e:print("Radio activation failed:", e)return -1finally:if ser:ser.close()if __name__ == "__main__":print("Initializing module...")cfun()consecutive_restarts = 0MAX_RESTARTS = 5COOLDOWN_TIME = 600while True:try:data = state()except Exception as e:print("Get state exception:", e)data = Noneping_ok = os.system(f'ping -c 1 {ip_addr} > /dev/null 2>&1') == 0module_responding = data.get('moduleResponding', 0) if isinstance(data, dict) else 0module_ok = data.get('moudelState', 0) if isinstance(data, dict) else 0sim_ok = data.get('simState', 0) if isinstance(data, dict) else 0print("Status check: ping=", ping_ok, ", moduleResponding=", module_responding, ", module=", module_ok, ", sim=", sim_ok)if not ping_ok or not module_responding:print("Module unresponsive or network down, preparing to report...")death_data = {"ip": deviceIp,"deviceIp": deviceIp,"moudelState": 0,"simState": 0}try:url = f"http://{ip_addr}:8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip={death_data['ip']}&moudelState={death_data['moudelState']}&simState={death_data['simState']}"response = requests.get(url, timeout=5)print("Report module dead status success, code:", response.status_code)except Exception as e:print("Report dead status failed:", e)if consecutive_restarts >= MAX_RESTARTS:print("Max restarts reached, cooling down for", COOLDOWN_TIME//60, "minutes...")time.sleep(COOLDOWN_TIME)consecutive_restarts = 0continueprint("Performing full restart... (attempt", consecutive_restarts + 1, ")")cfun()consecutive_restarts += 1time.sleep(15)continueif not module_ok:print("Radio off, activating radio...")activate_radio()time.sleep(8)continuetry:if consecutive_restarts > 0:print("Device recovered, reset restart counter from", consecutive_restarts)consecutive_restarts = 0cell_data = loop()if isinstance(cell_data, str):print("Cell quality get failed:", cell_data)time.sleep(2)fallback_data = {"ip": deviceIp,"deviceIp": deviceIp,"moudelState": 1,"simState": 0}report_data = data if isinstance(data, dict) else fallback_dataurl = f"http://{ip_addr}:8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip={report_data['ip']}&moudelState={report_data['moudelState']}&simState={report_data['simState']}"response = requests.get(url, timeout=5)print("Report normal status success, code:", response.status_code)time.sleep(5)except Exception as e:print("Report process error:", e)time.sleep(5)
from flask import Flask, jsonifyfrom serial_util import open_serial, send_AT,deviceIpimport timeapp = Flask(__name__)def cfun():ser = open_serial()send_AT(ser,"AT+CFUN=0\r\n")send_AT(ser,"AT+ZEMCI=0\r\n")time.sleep(0.5)send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+CGACT=1,1\r\n")print("dadsqweqweqfas")time.sleep(2)send_AT(ser,"AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+ZEMCI=1\r\n")send_AT(ser,"AT+CGPADDR=1\r\n")ser.close()return 0@app.route('/attach/<num>', methods=['POST', 'GET'])def attach(num):if not num.isdigit():return jsonify({"error": "Invalid parameter"}), 500result = []try:ser = open_serial()if not ser:return jsonify({"error": "ser can not use"}), 500for _ in range(int(num)):send_AT(ser, "AT+CGATT=0\r\n")time.sleep(1)start_time = time.time()response = send_AT(ser, "AT+CGATT=1\r\n")datas=response.split('\r\n')[1]if datas != 'ok':result.append(int((time.time() - start_time) * 1000))else:result.append('error')return jsonify(result)except Exception as e:print(f"fail to attach: {str(e)}")return jsonify({"error": "Internal error"}), 500finally:if ser:ser.close()cfun()if __name__ == '__main__':app.run(host=deviceIp, port=3653, debug=True)
cd /
rm -rf service_runner.log
rm -rf start.py
vi start.py
iimport threadingimport osimport sysimport subprocessfrom typing import Callable, Dictimport timeclass ServiceManager:def __init__(self):self.services = ["at.py", "api.py", "attach_api.py"]self.threads: Dict[str, threading.Thread] = {}self.processes: Dict[str, subprocess.Popen] = {}self.lock = threading.Lock()def run_script(self, script_name: str) -> None:try:process = subprocess.Popen([sys.executable, script_name],stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)with self.lock:self.processes[script_name] = processwhile True:output = process.stdout.readline() # type: ignoreif output == '' and process.poll() is not None:break# if output:# print(f"[{script_name}] {output.strip()}")exit_code = process.poll()# if exit_code != 0:# print(f"Error: {script_name} exited with code {exit_code}")except Exception as e:# print(f"Failed to start {script_name}: {str(e)}")passfinally:with self.lock:if script_name in self.processes:del self.processes[script_name]def create_daemon_thread(self, script_name: str) -> threading.Thread:thread = threading.Thread(target=lambda: self.run_script(script_name))thread.daemon = Truereturn threaddef restart_service(self, script_name: str):with self.lock:if script_name in self.threads and self.threads[script_name].is_alive():return# print(f"Restarting {script_name}...")thread = self.create_daemon_thread(script_name)thread.start()self.threads[script_name] = threaddef monitor(self):try:while True:with self.lock:for script in self.services.copy():thread = self.threads.get(script)if not thread or not thread.is_alive():# print(f"Warning: Service {script} died, restarting...")self.restart_service(script)time.sleep(5)except KeyboardInterrupt:# print("Received keyboard interrupt, shutting down...")sys.exit(0)if __name__ == "__main__":manager = ServiceManager()for script in manager.services:manager.restart_service(script)monitor_thread = threading.Thread(target=manager.monitor)monitor_thread.start()monitor_thread.join()
/etc/init.d/Start start
ps | grep python3