@liruiyi962464
2025-04-22T15:43:13.000000Z
字数 8104
阅读 11
Linux服务器部署
ps | grep python
cd /
vi serial_util.py
python3 start.py
rm *.py
cat /etc/init.d/Start
rm /etc/init.d/Start /etc/rc.d/S92Start
vi /etc/init.d/Start
chmod +x /etc/init.d/Start
/etc/init.d/Start enable
/etc/init.d/Start start
/etc/init.d/Start stop
/etc/init.d/Start stop
rm /at.py
vi /at.py
/etc/init.d/Start start
#!/bin/sh /etc/rc.common
USE_PROCD=1
START=92
STOP=15
start_service() {
procd_open_instance
procd_set_param command python3 start.py
procd_set_param respawn
procd_set_param stdout 1
procd_set_param stderr 1
procd_close_instance
}
stop_service() {
killall python3
}
restart() {
stop
start
}
import serial
import time
import fcntl
tty_com = "/dev/ttyUSB1"
baudrate = 115200
ip_addr = "172.0.0.11"
deviceIp = '172.0.0.163'
def open_serial():
try:
ser = serial.Serial(
tty_com, baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=3600
)
fcntl.flock(ser.fileno(), fcntl.LOCK_EX)
return ser
except Exception as e:
print(f"fail to open ser:{str(e)}")
return None
def send_AT(ser, cmd):
try:
if not ser.is_open:
ser.open()
ser.write(cmd.encode())
data = ser.read(1)
time.sleep(0.1)
data += ser.read(ser.inWaiting())
return data.decode()
except Exception as e:
print(f"AT fail: {str(e)}")
return "error"
from flask import Flask, request
import os,time
import subprocess
import re
from serial_util import open_serial, send_AT,deviceIp
app = Flask(__name__)
app.secret_key = 'asdfghjkl'
@app.route('/ping/<ip>/<num>', methods=['POST', 'GET'])
def ping(ip, num):
"""test for ping"""
try:
result = subprocess.run(
['ping', '-c', num, ip],
capture_output=True, text=True, timeout=10
)
return result.stdout.replace('\r', '')
except Exception as e:
print(f"fail to Ping: {str(e)}")
return "error"
@app.route('/init', methods=['POST', 'GET'])
def init():
state = request.args.get('state', '')
if not state.isdigit():
return "Invalid state", 400
try:
os.system("kill $(pgrep -f 'python3 at.py')")
ser = open_serial()
if not ser:
return "ser can not use", 500
commands = {
'0': "AT+CFUN=1\r\n",
'1': "AT+CFUN=0\r\nAT+ZEMCI=0\r\n",
'2': "AT+CFUN=1\r\n",
'3': "AT+CGATT=0\r\nAT+CGATT=1\r\n",
'4': "AT+CEREG=1\r\nAT+CGDCONT=1,\"IP\"\r\nAT+CGACT=1,1\r\nAT+ZGACT=1,1\r\n"
}
if state in commands:
send_AT(ser, commands[state])
time.sleep(1)
return state
except Exception as e:
print(f"fail to init: {str(e)}")
return "error", 500
finally:
os.system("nohup python3 at.py >> /dev/null 2>&1 &")
if ser:
ser.close()
if __name__ == '__main__':
app.run(host=deviceIp, port=3652, debug=True)
import os
import re
import json
import subprocess
import requests
import time
from serial_util import open_serial, send_AT,ip_addr,deviceIp
def zemci():
try:
ser = open_serial()
if not ser:
return "yizhuxiao"
send_AT(ser, "AT+ZEMCI=1\r\n")
time.sleep(1.2)
datas = send_AT(ser, "AT+ZEMCI=1\r\n").split('\r\n')
if len(datas) < 2 or 'ERROR' in datas[1] or datas[1] == 'OK':
return "yizhuxiao"
zemci_data = datas[1].split(':')[1].split(',')
parameter = {
"deviceIp": deviceIp,
"cellId": hex(int(zemci_data[0]))[2:],
"tac": zemci_data[2],
"band1": zemci_data[6],
"rsrp": int(zemci_data[10]) - 140,
"rsrq": int(zemci_data[11]) * 0.5 - 20,
"pci": zemci_data[14][:4],
"earfcn": int(zemci_data[14][4:], 16),
"csq": zemci_data[10]
}
print(parameter)
return parameter
except Exception as e:
print(f"fail to get ipconfig: {str(e)}")
return "error"
finally:
if ser:
ser.close()
def loop():
data = zemci()
if data != "yizhuxiao" and data.get("deviceIp") == deviceIp:
try:
response = requests.post(
f"http://{ip_addr}:8091/jeecg-boot/onlineStateDetection/deviceNetworkstatus/add",
json=data,
headers={"Content-Type": "application/json"},
timeout=5
)
print(f"code:{response.status_code}")
except Exception as e:
print(f"fail to update:{str(e)}")
return data
def attach():
ser = open_serial()
send_AT(ser,"AT+ZGACT=1,1\r\n")
time.sleep(1)
ser.close()
return 0
def state():
parameter = {}
ser = open_serial()
if not ser:
return "yizhuxiao"
send_AT(ser, "AT+ZEMCI=1\r\n")
datas = send_AT(ser, "AT+CPIN?\r\n")
card_status = datas.split('\r\n')[1]
datas = send_AT(ser, "AT+CFUN?\r\n")
fun = datas.split('\r\n')[1]
if fun == '+CFUN: 0':
moudelState = 0
else:
moudelState = 1
if card_status == '+CME ERROR: 10':
simState = 0
else:
simState = 1
parameter["ip"] = deviceIp
parameter['moudelState'] = moudelState
parameter['simState'] = simState
return parameter
def cfun():
ser = open_serial()
send_AT(ser,"AT+CFUN=0\r\n")
send_AT(ser,"AT+ZEMCI=0\r\n")
time.sleep(0.5)
send_AT(ser,"AT+CFUN=1\r\n&AT+CGACT=1,1\r\n&AT+ZGACT=1,1\r\n")
time.sleep(2)
send_AT(ser,"AT+CGACT=1,1\r\n")
print("dadsqweqweqfas")
time.sleep(2)
send_AT(ser,"AT+ZGACT=1,1\r\n")
time.sleep(2)
send_AT(ser,"AT+ZEMCI=1\r\n")
send_AT(ser,"AT+CGPADDR=1\r\n")
ser.close()
return 0
if __name__ == "__main__":
num = 0
cfun()
while True:
try:
data = state()
except Exception as e:
print('error')
result = os.system('ping -c 1 ' + ip_addr + ' >>/dev/null')
print(result)
if result:
time.sleep(10)
print('error1')
else:
try:
if num == 0:
num = 1
loop()
print(2)
time.sleep(5)
try:
url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \
data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])
response = requests.get(url)
except Exception:
print('request error')
elif num == 1:
num = 0
loop()
print(200)
time.sleep(5)
try:
url = "http://" + ip_addr + ":8091/jeecg-boot/system/deviceInfo/updateDeviceSIMStateAndMoudleState?ip=" + \
data["ip"] + "&moudelState=" + str(data["moudelState"]) + "&simState=" + str(data["simState"])
response = requests.get(url)
except Exception:
print('request error')
except KeyboardInterrupt:
break
/etc/init.d/Start stop
rm /attach_api.py
vi /attach_api.py
/etc/init.d/Start start
from flask import Flask, jsonify
from serial_util import open_serial, send_AT,deviceIp
import time
app = Flask(__name__)
@app.route('/attach/<num>', methods=['POST', 'GET'])
def attach(num):
if not num.isdigit():
return jsonify({"error": "Invalid parameter"}), 500
result = []
try:
ser = open_serial()
if not ser:
return jsonify({"error": "ser can not use"}), 500
for _ in range(int(num)):
send_AT(ser, "AT+CGATT=0\r\n")
start_time = time.time()
response = send_AT(ser, "AT+CGATT=1\r\n")
datas=response.split('\r\n')[1]
if datas != 'ok':
result.append(int((time.time() - start_time) * 1000))
else:
result.append('error')
return jsonify(result)
except Exception as e:
print(f"fail to attach: {str(e)}")
return jsonify({"error": "Internal error"}), 500
finally:
if ser:
ser.close()
if __name__ == '__main__':
app.run(host=deviceIp, port=3653, debug=True)
import threading
import os
import sys
import subprocess
import logging
from typing import Callable, Dict
import time
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("service_runner.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ServiceManager:
def __init__(self):
self.services = ["at.py", "api.py", "attach_api.py"]
self.threads: Dict[str, threading.Thread] = {}
self.processes: Dict[str, subprocess.Popen] = {}
self.lock = threading.Lock()
def run_script(self, script_name: str) -> None:
try:
logger.info(f"Starting {script_name}...")
process = subprocess.Popen(
[sys.executable, script_name],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
with self.lock:
self.processes[script_name] = process
while True:
output = process.stdout.readline() # type: ignore
if output == '' and process.poll() is not None:
break
if output:
logger.info(f"[{script_name}] {output.strip()}")
exit_code = process.poll()
if exit_code != 0:
logger.error(f"{script_name} exited with code {exit_code}")
except Exception as e:
logger.error(f"Failed to start {script_name}: {str(e)}")
finally:
with self.lock:
if script_name in self.processes:
del self.processes[script_name]
def create_daemon_thread(self, script_name: str) -> threading.Thread:
thread = threading.Thread(target=lambda: self.run_script(script_name))
thread.daemon = True
return thread
def restart_service(self, script_name: str):
with self.lock:
if script_name in self.threads and self.threads[script_name].is_alive():
return
logger.info(f"Restarting {script_name}...")
thread = self.create_daemon_thread(script_name)
thread.start()
self.threads[script_name] = thread
def monitor(self):
try:
while True:
with self.lock:
for script in self.services.copy():
thread = self.threads.get(script)
if not thread or not thread.is_alive():
logger.warning(f"Service {script} died, restarting...")
self.restart_service(script)
time.sleep(5)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
sys.exit(0)
if __name__ == "__main__":
manager = ServiceManager()
for script in manager.services:
manager.restart_service(script)
monitor_thread = threading.Thread(target=manager.monitor)
monitor_thread.start()
monitor_thread.join()
python3 start.py