[关闭]
@liruiyi962464 2025-07-26T07:31:41.000000Z 字数 8824 阅读 74

Python实时传送自建视频流,Java WebSocket接收

代码


测试使用版本:jeecg-boot 3.7.2

Python Dome

  1. import asyncio
  2. import cv2
  3. import numpy as np
  4. import mss
  5. import time
  6. import websockets
  7. # WebSocket 服务器地址
  8. URI = "ws://192.168.10.167:8080/jeecg-boot/video-stream"
  9. async def send_frame(websocket):
  10. with mss.mss() as sct:
  11. monitor = sct.monitors[1]
  12. width, height = int(monitor["width"] * 0.5), int(monitor["height"] * 0.5)
  13. # 使用 H.264 编码器(需安装支持的 OpenCV)
  14. # 如果报错,尝试 cv2.VideoWriter_fourcc(*'XVID')
  15. fourcc = cv2.VideoWriter_fourcc(*'X264') # 或 'H264', 'XVID'
  16. fps = 5
  17. frame_interval = 1.0 / fps
  18. last_time = time.time()
  19. print("开始实时录屏并上传...")
  20. while True:
  21. current_time = time.time()
  22. if current_time - last_time < frame_interval:
  23. continue
  24. # 截图并处理
  25. screenshot = sct.grab(monitor)
  26. frame = np.array(screenshot)[:, :, :3] # 去掉 alpha 通道
  27. frame = cv2.resize(frame, (width, height))
  28. # 编码为 H.264 帧
  29. ret, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
  30. if not ret:
  31. continue
  32. await websocket.send(buffer.tobytes())
  33. last_time = current_time
  34. async def main():
  35. async with websockets.connect(URI) as websocket:
  36. await send_frame(websocket)
  37. if __name__ == "__main__":
  38. asyncio.run(main())

ShiroConfig.java

放开接口,因为没有token,ShiroConfig增加下列代码

  1. filterChainDefinitionMap.put("/video-stream", "anon"); // 放行 视频播放路径

WebSocketConfig.java

增加注解@EnableWebSocket
WebSocketConfig实现WebSocketConfigurer接口

  1. package org.jeecg.config;
  2. import org.jeecg.VideoStreamHandler;
  3. import org.jeecg.config.filter.WebsocketFilter;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.web.servlet.FilterRegistrationBean;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  9. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  10. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  11. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  12. /**
  13. * @Description: WebSocketConfig
  14. * @author: jeecg-boot
  15. */
  16. @Configuration
  17. @EnableWebSocket
  18. public class WebSocketConfig implements WebSocketConfigurer {
  19. /**
  20. * 注入ServerEndpointExporter,
  21. * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
  22. */
  23. @Bean
  24. public ServerEndpointExporter serverEndpointExporter() {
  25. return new ServerEndpointExporter();
  26. }
  27. @Bean
  28. public WebsocketFilter websocketFilter(){
  29. return new WebsocketFilter();
  30. }
  31. @Bean
  32. public FilterRegistrationBean getFilterRegistrationBean() {
  33. FilterRegistrationBean bean = new FilterRegistrationBean();
  34. bean.setFilter(websocketFilter());
  35. bean.addUrlPatterns(
  36. "/taskCountSocket/*",
  37. "/websocket/*",
  38. "/eoaSocket/*",
  39. "/eoaNewChatSocket/*",
  40. "/newsWebsocket/*",
  41. "/dragChannelSocket/*",
  42. "/vxeSocket/*"
  43. );
  44. return bean;
  45. }
  46. // 以下为新增代码
  47. @Autowired
  48. private VideoStreamHandler videoStreamHandler;
  49. // 实现的接口
  50. @Override
  51. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  52. System.out.println("======== 正在注册 WebSocket 端点: /video-stream ========");
  53. registry.addHandler(videoStreamHandler, "/video-stream")
  54. .setAllowedOrigins("*");
  55. }
  56. }

VideoStreamMessage.java

  1. package org.jeecg;
  2. import lombok.Data;
  3. /**
  4. * 视频流消息实体类
  5. */
  6. @Data
  7. public class VideoStreamMessage {
  8. private String type; // 消息类型:"params"(参数消息)或 "data"(数据消息)或 "stop"(停止消息)
  9. private String fileName; // 视频文件名(如未提供,服务端自动生成)
  10. private String sessionId; // 会话ID(通常由WebSocket自动生成)
  11. private String from; // 发送方标识(如设备ID、客户端名称)
  12. private String content; // 消息内容(参数消息中为元数据,数据消息中为描述)
  13. private long timestamp; // 时间戳(毫秒级,用于同步或日志)
  14. }

VideoStreamHandler.java

实现接受视频流
支持接收参数

  1. package org.jeecg;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import lombok.Data;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.web.socket.BinaryMessage;
  8. import org.springframework.web.socket.TextMessage;
  9. import org.springframework.web.socket.WebSocketSession;
  10. import org.springframework.web.socket.handler.BinaryWebSocketHandler;
  11. import java.io.FileOutputStream;
  12. import java.io.IOException;
  13. import java.nio.ByteBuffer;
  14. import java.text.SimpleDateFormat;
  15. import java.util.Date;
  16. import java.util.Map;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.atomic.AtomicBoolean;
  19. /**
  20. * 视频流WebSocket处理器
  21. */
  22. @Component
  23. public class VideoStreamHandler extends BinaryWebSocketHandler {
  24. private static final Logger logger = LoggerFactory.getLogger(VideoStreamHandler.class);
  25. private static final String OUTPUT_DIR = "D:\\opt\\upFiles\\"; // 视频文件输出目录
  26. private final ObjectMapper objectMapper = new ObjectMapper(); // JSON解析器
  27. // 存储会话状态
  28. private final Map<String, FileOutputStream> sessionOutputs = new ConcurrentHashMap<>(); // 文件输出流
  29. private final Map<String, VideoStreamParams> sessionParams = new ConcurrentHashMap<>(); // 会话参数
  30. private final Map<String, AtomicBoolean> writingFlags = new ConcurrentHashMap<>(); // 写入状态标志
  31. /**
  32. * 会话参数内部类
  33. */
  34. @Data
  35. private static class VideoStreamParams {
  36. private String fileName; // 文件名
  37. private String from; // 发送方
  38. private String content; // 内容描述
  39. private long timestamp; // 时间戳
  40. }
  41. @Override
  42. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  43. logger.info("===== 新WebSocket连接 =====");
  44. logger.info("Session ID: {}", session.getId());
  45. logger.info("URI: {}", session.getUri());
  46. writingFlags.put(session.getId(), new AtomicBoolean(true)); // 初始化写入标志
  47. }
  48. @Override
  49. protected void handleTextMessage(WebSocketSession session, TextMessage message) {
  50. String payload = message.getPayload();
  51. logger.info("收到文本消息: {}", payload);
  52. try {
  53. VideoStreamMessage msg = objectMapper.readValue(payload, VideoStreamMessage.class);
  54. if ("params".equals(msg.getType())) {
  55. // 处理参数消息:初始化会话参数和文件输出流
  56. VideoStreamParams params = new VideoStreamParams();
  57. params.setFileName(msg.getFileName());
  58. params.setFrom(msg.getFrom());
  59. params.setContent(msg.getContent());
  60. params.setTimestamp(msg.getTimestamp());
  61. sessionParams.put(session.getId(), params);
  62. // === 新增:输出解析后的参数 ===
  63. logger.info("===== 视频流参数 =====");
  64. logger.info("文件名: {}", msg.getFileName());
  65. logger.info("来源: {}", msg.getFrom());
  66. logger.info("内容: {}", msg.getContent());
  67. logger.info("时间戳: {}", msg.getTimestamp());
  68. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  69. String formattedTime = sdf.format(new Date(msg.getTimestamp()));
  70. logger.info("格式化时间: {}", formattedTime);
  71. // 创建输出文件(使用会话ID作为默认文件名)
  72. String outputFile = OUTPUT_DIR + (msg.getFileName() != null ?
  73. msg.getFileName() : "video_" + session.getId() + ".mp4");
  74. sessionOutputs.put(session.getId(), new FileOutputStream(outputFile));
  75. logger.info("视频流保存路径: {}", outputFile);
  76. } else if ("stop".equalsIgnoreCase(msg.getType())) {
  77. // 处理停止消息
  78. closeSession(session);
  79. }
  80. } catch (Exception e) {
  81. logger.warn("文本消息解析失败: {}", e.getMessage());
  82. }
  83. }
  84. @Override
  85. protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
  86. // 检查会话是否有效
  87. AtomicBoolean flag = writingFlags.get(session.getId());
  88. if (flag == null || !flag.get()) return;
  89. // 写入二进制数据到文件
  90. FileOutputStream fos = sessionOutputs.get(session.getId());
  91. if (fos != null) {
  92. try {
  93. ByteBuffer buffer = message.getPayload();
  94. fos.write(buffer.array());
  95. } catch (IOException e) {
  96. logger.error("二进制写入失败: {}", e.getMessage());
  97. closeSession(session);
  98. }
  99. }
  100. }
  101. @Override
  102. public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) {
  103. closeSession(session); // 清理资源
  104. }
  105. /**
  106. * 关闭会话并释放资源
  107. */
  108. private void closeSession(WebSocketSession session) {
  109. String sessionId = session.getId();
  110. AtomicBoolean flag = writingFlags.getOrDefault(sessionId, new AtomicBoolean(false));
  111. flag.set(false); // 禁止继续写入
  112. try {
  113. FileOutputStream fos = sessionOutputs.get(sessionId);
  114. if (fos != null) {
  115. fos.close();
  116. logger.info("会话结束,文件已保存: {}", sessionParams.get(sessionId).getFileName());
  117. }
  118. } catch (IOException e) {
  119. logger.error("文件关闭失败: {}", e.getMessage());
  120. } finally {
  121. sessionOutputs.remove(sessionId);
  122. sessionParams.remove(sessionId);
  123. writingFlags.remove(sessionId);
  124. }
  125. }
  126. }

测试

开始推送

  1. {
  2. "type": "params",
  3. "fileName": "test_video.mp4",
  4. "from": "camera_01",
  5. "content": "分辨率=1080p, 编码格式=h264, 帧率=30fps",
  6. "timestamp": 1630000000000
  7. }

结束推送

  1. {
  2. "type": "stop"
  3. }

结果

  1. 2025-07-25 17:45:55.356 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:49 - ===== WebSocket连接 =====
  2. 2025-07-25 17:45:55.357 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:50 - Session ID: b5391ec8-5bf2-b9d3-8def-8d7ae0668940
  3. 2025-07-25 17:45:55.357 [XNIO-1 task-2] INFO org.jeecg.VideoStreamHandler:51 - URI: ws://192.168.10.167:8080/jeecg-boot/video-stream
  4. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:58 - 收到文本消息: {
  5. "type": "params",
  6. "fileName": "test_video.mp4",
  7. "from": "camera_01",
  8. "content": "分辨率=1080p, 编码格式=h264, 帧率=30fps",
  9. "timestamp": 1630000000000
  10. }
  11. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:73 - ===== 视频流参数 =====
  12. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:74 - 文件名: test_video.mp4
  13. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:75 - 来源: camera_01
  14. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:76 - 内容: 分辨率=1080p, 编码格式=h264, 帧率=30fps
  15. 2025-07-25 17:46:00.642 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:77 - 时间戳: 1630000000000
  16. 2025-07-25 17:46:00.643 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:80 - 格式化时间: 2021-08-27 01:46:40
  17. 2025-07-25 17:46:00.643 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:86 - 视频流保存路径: D:\opt\upFiles\test_video.mp4
  18. 2025-07-25 17:46:13.486 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:58 - 收到文本消息: {
  19. "type": "stop"
  20. }
  21. 2025-07-25 17:46:15.381 [XNIO-1 I/O-11] INFO org.jeecg.VideoStreamHandler:133 - 会话结束,文件已保存: test_video.mp4
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注