@liruiyi962464
2025-07-26T07:31:41.000000Z
字数 8824
阅读 74
代码
测试使用版本:jeecg-boot 3.7.2
import asyncio
import cv2
import numpy as np
import mss
import time
import websockets
# WebSocket 服务器地址
URI = "ws://192.168.10.167:8080/jeecg-boot/video-stream"
async def send_frame(websocket):
with mss.mss() as sct:
monitor = sct.monitors[1]
width, height = int(monitor["width"] * 0.5), int(monitor["height"] * 0.5)
# 使用 H.264 编码器(需安装支持的 OpenCV)
# 如果报错,尝试 cv2.VideoWriter_fourcc(*'XVID')
fourcc = cv2.VideoWriter_fourcc(*'X264') # 或 'H264', 'XVID'
fps = 5
frame_interval = 1.0 / fps
last_time = time.time()
print("开始实时录屏并上传...")
while True:
current_time = time.time()
if current_time - last_time < frame_interval:
continue
# 截图并处理
screenshot = sct.grab(monitor)
frame = np.array(screenshot)[:, :, :3] # 去掉 alpha 通道
frame = cv2.resize(frame, (width, height))
# 编码为 H.264 帧
ret, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
if not ret:
continue
await websocket.send(buffer.tobytes())
last_time = current_time
async def main():
async with websockets.connect(URI) as websocket:
await send_frame(websocket)
if __name__ == "__main__":
asyncio.run(main())
放开接口,因为没有token,
ShiroConfig
增加下列代码
filterChainDefinitionMap.put("/video-stream", "anon"); // 放行 视频播放路径
增加注解
@EnableWebSocket
WebSocketConfig
实现WebSocketConfigurer
接口
package org.jeecg.config;
import org.jeecg.VideoStreamHandler;
import org.jeecg.config.filter.WebsocketFilter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description: WebSocketConfig
* @author: jeecg-boot
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public WebsocketFilter websocketFilter(){
return new WebsocketFilter();
}
@Bean
public FilterRegistrationBean getFilterRegistrationBean() {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(websocketFilter());
bean.addUrlPatterns(
"/taskCountSocket/*",
"/websocket/*",
"/eoaSocket/*",
"/eoaNewChatSocket/*",
"/newsWebsocket/*",
"/dragChannelSocket/*",
"/vxeSocket/*"
);
return bean;
}
// 以下为新增代码
@Autowired
private VideoStreamHandler videoStreamHandler;
// 实现的接口
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
System.out.println("======== 正在注册 WebSocket 端点: /video-stream ========");
registry.addHandler(videoStreamHandler, "/video-stream")
.setAllowedOrigins("*");
}
}
package org.jeecg;
import lombok.Data;
/**
* 视频流消息实体类
*/
@Data
public class VideoStreamMessage {
private String type; // 消息类型:"params"(参数消息)或 "data"(数据消息)或 "stop"(停止消息)
private String fileName; // 视频文件名(如未提供,服务端自动生成)
private String sessionId; // 会话ID(通常由WebSocket自动生成)
private String from; // 发送方标识(如设备ID、客户端名称)
private String content; // 消息内容(参数消息中为元数据,数据消息中为描述)
private long timestamp; // 时间戳(毫秒级,用于同步或日志)
}
实现接受视频流
支持接收参数
package org.jeecg;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.BinaryWebSocketHandler;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 视频流WebSocket处理器
*/
@Component
public class VideoStreamHandler extends BinaryWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(VideoStreamHandler.class);
private static final String OUTPUT_DIR = "D:\\opt\\upFiles\\"; // 视频文件输出目录
private final ObjectMapper objectMapper = new ObjectMapper(); // JSON解析器
// 存储会话状态
private final Map<String, FileOutputStream> sessionOutputs = new ConcurrentHashMap<>(); // 文件输出流
private final Map<String, VideoStreamParams> sessionParams = new ConcurrentHashMap<>(); // 会话参数
private final Map<String, AtomicBoolean> writingFlags = new ConcurrentHashMap<>(); // 写入状态标志
/**
* 会话参数内部类
*/
@Data
private static class VideoStreamParams {
private String fileName; // 文件名
private String from; // 发送方
private String content; // 内容描述
private long timestamp; // 时间戳
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("===== 新WebSocket连接 =====");
logger.info("Session ID: {}", session.getId());
logger.info("URI: {}", session.getUri());
writingFlags.put(session.getId(), new AtomicBoolean(true)); // 初始化写入标志
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
logger.info("收到文本消息: {}", payload);
try {
VideoStreamMessage msg = objectMapper.readValue(payload, VideoStreamMessage.class);
if ("params".equals(msg.getType())) {
// 处理参数消息:初始化会话参数和文件输出流
VideoStreamParams params = new VideoStreamParams();
params.setFileName(msg.getFileName());
params.setFrom(msg.getFrom());
params.setContent(msg.getContent());
params.setTimestamp(msg.getTimestamp());
sessionParams.put(session.getId(), params);
// === 新增:输出解析后的参数 ===
logger.info("===== 视频流参数 =====");
logger.info("文件名: {}", msg.getFileName());
logger.info("来源: {}", msg.getFrom());
logger.info("内容: {}", msg.getContent());
logger.info("时间戳: {}", msg.getTimestamp());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formattedTime = sdf.format(new Date(msg.getTimestamp()));
logger.info("格式化时间: {}", formattedTime);
// 创建输出文件(使用会话ID作为默认文件名)
String outputFile = OUTPUT_DIR + (msg.getFileName() != null ?
msg.getFileName() : "video_" + session.getId() + ".mp4");
sessionOutputs.put(session.getId(), new FileOutputStream(outputFile));
logger.info("视频流保存路径: {}", outputFile);
} else if ("stop".equalsIgnoreCase(msg.getType())) {
// 处理停止消息
closeSession(session);
}
} catch (Exception e) {
logger.warn("文本消息解析失败: {}", e.getMessage());
}
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
// 检查会话是否有效
AtomicBoolean flag = writingFlags.get(session.getId());
if (flag == null || !flag.get()) return;
// 写入二进制数据到文件
FileOutputStream fos = sessionOutputs.get(session.getId());
if (fos != null) {
try {
ByteBuffer buffer = message.getPayload();
fos.write(buffer.array());
} catch (IOException e) {
logger.error("二进制写入失败: {}", e.getMessage());
closeSession(session);
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) {
closeSession(session); // 清理资源
}
/**
* 关闭会话并释放资源
*/
private void closeSession(WebSocketSession session) {
String sessionId = session.getId();
AtomicBoolean flag = writingFlags.getOrDefault(sessionId, new AtomicBoolean(false));
flag.set(false); // 禁止继续写入
try {
FileOutputStream fos = sessionOutputs.get(sessionId);
if (fos != null) {
fos.close();
logger.info("会话结束,文件已保存: {}", sessionParams.get(sessionId).getFileName());
}
} catch (IOException e) {
logger.error("文件关闭失败: {}", e.getMessage());
} finally {
sessionOutputs.remove(sessionId);
sessionParams.remove(sessionId);
writingFlags.remove(sessionId);
}
}
}
开始推送
{
"type": "params",
"fileName": "test_video.mp4",
"from": "camera_01",
"content": "分辨率=1080p, 编码格式=h264, 帧率=30fps",
"timestamp": 1630000000000
}
结束推送
{
"type": "stop"
}
结果
2025-07-25 17:45:55.356 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:49 - ===== 新WebSocket连接 =====
2025-07-25 17:45:55.357 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:50 - Session ID: b5391ec8-5bf2-b9d3-8def-8d7ae0668940
2025-07-25 17:45:55.357 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:51 - URI: ws://192.168.10.167:8080/jeecg-boot/video-stream
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:58 - 收到文本消息: {
"type": "params",
"fileName": "test_video.mp4",
"from": "camera_01",
"content": "分辨率=1080p, 编码格式=h264, 帧率=30fps",
"timestamp": 1630000000000
}
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:73 - ===== 视频流参数 =====
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:74 - 文件名: test_video.mp4
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:75 - 来源: camera_01
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:76 - 内容: 分辨率=1080p, 编码格式=h264, 帧率=30fps
2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:77 - 时间戳: 1630000000000
2025-07-25 17:46:00.643 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:80 - 格式化时间: 2021-08-27 01:46:40
2025-07-25 17:46:00.643 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:86 - 视频流保存路径: D:\opt\upFiles\test_video.mp4
2025-07-25 17:46:13.486 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:58 - 收到文本消息: {
"type": "stop"
}
2025-07-25 17:46:15.381 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:133 - 会话结束,文件已保存: test_video.mp4