@liruiyi962464
2025-05-15T01:14:17.000000Z
字数 8414
阅读 100
Linux服务器部署
ps | grep pythoncd /vi serial_util.pypython3 start.pyrm *.pycat /etc/init.d/Startrm /etc/init.d/Start /etc/rc.d/S92Startvi /etc/init.d/Startchmod +x /etc/init.d/Start/etc/init.d/Start enable/etc/init.d/Start start/etc/init.d/Start stop/etc/init.d/Start stoprm /at.pyvi /at.py/etc/init.d/Start stop/etc/init.d/Start startreboot
#!/bin/sh /etc/rc.commonUSE_PROCD=1START=92STOP=15start_service() {procd_open_instanceprocd_set_param command python3 start.pyprocd_set_param respawnprocd_set_param stdout 1procd_set_param stderr 1procd_close_instance}stop_service() {killall python3}restart() {stopstart}
import serialimport timeimport fcntltty_com = "/dev/ttyUSB1"baudrate = 115200ip_addr = "172.0.0.11"deviceIp = '172.0.0.161'def open_serial():try:ser = serial.Serial(tty_com, baudrate,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=3600)fcntl.flock(ser.fileno(),fcntl.LOCK_EX)return serexcept Exception as e:print(f"fail to open ser:{str(e)}")return Nonedef send_AT(ser, cmd):try:if not ser.is_open:ser.open()ser.write(cmd.encode())data = ser.read(1)time.sleep(0.1)data += ser.read(ser.inWaiting())return data.decode()except Exception as e:print(f"AT fail: {str(e)}")return "error"
from flask import Flask, requestimport os,timeimport subprocessfrom serial_util import open_serial, send_AT,deviceIpapp = Flask(__name__)app.secret_key = 'asdfghjkl'@app.route('/ping/<ip>/<num>', methods=['POST', 'GET'])def ping(ip, num):"""test for ping"""try:result = subprocess.run(['ping', '-c', num, ip],capture_output=True, text=True, timeout=10)return result.stdout.split('\n')except Exception as e:print(f"fail to Ping: {str(e)}")return "error"@app.route('/init', methods=['POST', 'GET'])def init():state = request.args.get('state', '')if not state.isdigit():return "Invalid state", 400try:os.system("kill $(pgrep -f 'python3 at.py')")ser = open_serial()if not ser:return "ser can not use", 500commands = {'0': "AT+CFUN=1\r\n",'1': "AT+CFUN=0\r\nAT+ZEMCI=0\r\n",'2': "AT+CFUN=1\r\n",'3': "AT+CGATT=0\r\nAT+CGATT=1\r\n",'4': "AT+CEREG=1\r\nAT+CGDCONT=1,\"IP\"\r\nAT+CGACT=1,1\r\nAT+ZGACT=1,1\r\n"}if state in commands:send_AT(ser, commands[state])time.sleep(1)return stateexcept Exception as e:print(f"fail to init: {str(e)}")return "error", 500finally:os.system("nohup python3 at.py >> /dev/null 2>&1 &")if ser:ser.close()if __name__ == '__main__':app.run(host=deviceIp, port=3652, debug=True)
import osimport reimport jsonimport subprocessimport requestsimport timefrom serial_util import open_serial, send_AT,ip_addr,deviceIpdef zemci():try:ser = open_serial()if not ser:return "yizhuxiao"send_AT(ser, "AT+ZEMCI=1\r\n")time.sleep(1.2)datas = send_AT(ser, "AT+ZEMCI=1\r\n").split('\r\n')if len(datas) < 2 or 'ERROR' in datas[1] or datas[1] == 'OK':return "yizhuxiao"zemci_data = datas[1].split(':')[1].split(',')parameter = {"deviceIp": deviceIp,"cellId": hex(int(zemci_data[0]))[2:],"tac": zemci_data[2],"band1": zemci_data[6],"rsrp": int(zemci_data[10]) - 140,"rsrq": int(zemci_data[11]) * 0.5 - 20,"pci": zemci_data[14][:4],"earfcn": int(zemci_data[14][4:], 16),"csq": zemci_data[10]}print(parameter)return parameterexcept Exception as e:print(f"fail to get ipconfig: {str(e)}")return "error"finally:if ser:ser.close()def loop():data = zemci()if isinstance(data, dict) and data.get("deviceIp") == deviceIp:try:response = requests.post(f"http://{ip_addr}:8091/jeecg-boot/onlineStateDetection/deviceNetworkstatus/add",json=data,headers={"Content-Type": "application/json"},timeout=5)print(f"code:{response.status_code}")except Exception as e:print(f"fail to update:{str(e)}")return datadef attach():ser = open_serial()send_AT(ser,"AT+ZGACT=1,1\r\n")time.sleep(1)ser.close()return 0def state():parameter = {}ser = open_serial()if not ser:return "yizhuxiao"send_AT(ser, "AT+ZEMCI=1\r\n")datas = send_AT(ser, "AT+CPIN?\r\n")card_status = datas.split('\r\n')[1]datas = send_AT(ser, "AT+CFUN?\r\n")fun = datas.split('\r\n')[1]if fun == '+CFUN: 0':moudelState = 0else:moudelState = 1if card_status == '+CME ERROR: 10':simState = 0else:simState = 1parameter["ip"] = deviceIpparameter['moudelState'] = moudelStateparameter['simState'] = simStatereturn parameterdef cfun():ser = open_serial()send_AT(ser,"AT+CFUN=0\r\n")send_AT(ser,"AT+ZEMCI=0\r\n")time.sleep(0.5)send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+CGACT=1,1\r\n")print("dadsqweqweqfas")time.sleep(2)send_AT(ser,"AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+ZEMCI=1\r\n")send_AT(ser,"AT+CGPADDR=1\r\n")ser.close()return 0if __name__ == "__main__":num = 0cfun()while True:try:data = state()except Exception as e:print('error')result = os.system('ping -c 1 ' + ip_addr + ' >>/dev/null')print(result)if result:time.sleep(10)print('error1')else:try:if num == 0:num = 1loop()print(2)time.sleep(5)try:url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])response = requests.get(url)except Exception:print('request error')elif num == 1:num = 0loop()print(200)time.sleep(5)try:url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])response = requests.get(url)except Exception:print('request error')except KeyboardInterrupt:break
from flask import Flask, jsonifyfrom serial_util import open_serial, send_AT,deviceIpimport timeapp = Flask(__name__)def cfun():ser = open_serial()send_AT(ser,"AT+CFUN=0\r\n")send_AT(ser,"AT+ZEMCI=0\r\n")time.sleep(0.5)send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+CGACT=1,1\r\n")print("dadsqweqweqfas")time.sleep(2)send_AT(ser,"AT+ZGACT=1,1\r\n")time.sleep(2)send_AT(ser,"AT+ZEMCI=1\r\n")send_AT(ser,"AT+CGPADDR=1\r\n")ser.close()return 0@app.route('/attach/<num>', methods=['POST', 'GET'])def attach(num):if not num.isdigit():return jsonify({"error": "Invalid parameter"}), 500result = []try:ser = open_serial()if not ser:return jsonify({"error": "ser can not use"}), 500for _ in range(int(num)):send_AT(ser, "AT+CGATT=0\r\n")time.sleep(1)start_time = time.time()response = send_AT(ser, "AT+CGATT=1\r\n")datas=response.split('\r\n')[1]if datas != 'ok':result.append(int((time.time() - start_time) * 1000))else:result.append('error')return jsonify(result)except Exception as e:print(f"fail to attach: {str(e)}")return jsonify({"error": "Internal error"}), 500finally:if ser:ser.close()cfun()if __name__ == '__main__':app.run(host=deviceIp, port=3653, debug=True)
import threadingimport osimport sysimport subprocessimport loggingfrom typing import Callable, Dictimport timelogging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("service_runner.log"),logging.StreamHandler()])logger = logging.getLogger(__name__)class ServiceManager:def __init__(self):self.services = ["at.py", "api.py", "attach_api.py"]self.threads: Dict[str, threading.Thread] = {}self.processes: Dict[str, subprocess.Popen] = {}self.lock = threading.Lock()def run_script(self, script_name: str) -> None:try:logger.info(f"Starting {script_name}...")process = subprocess.Popen([sys.executable, script_name],stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)with self.lock:self.processes[script_name] = processwhile True:output = process.stdout.readline() # type: ignoreif output == '' and process.poll() is not None:breakif output:logger.info(f"[{script_name}] {output.strip()}")exit_code = process.poll()if exit_code != 0:logger.error(f"{script_name} exited with code {exit_code}")except Exception as e:logger.error(f"Failed to start {script_name}: {str(e)}")finally:with self.lock:if script_name in self.processes:del self.processes[script_name]def create_daemon_thread(self, script_name: str) -> threading.Thread:thread = threading.Thread(target=lambda: self.run_script(script_name))thread.daemon = Truereturn threaddef restart_service(self, script_name: str):with self.lock:if script_name in self.threads and self.threads[script_name].is_alive():returnlogger.info(f"Restarting {script_name}...")thread = self.create_daemon_thread(script_name)thread.start()self.threads[script_name] = threaddef monitor(self):try:while True:with self.lock:for script in self.services.copy():thread = self.threads.get(script)if not thread or not thread.is_alive():logger.warning(f"Service {script} died, restarting...")self.restart_service(script)time.sleep(5)except KeyboardInterrupt:logger.info("Received keyboard interrupt, shutting down...")sys.exit(0)if __name__ == "__main__":manager = ServiceManager()for script in manager.services:manager.restart_service(script)monitor_thread = threading.Thread(target=manager.monitor)monitor_thread.start()monitor_thread.join()