[关闭]
@TedZhou 2020-06-28T13:56:53.000000Z 字数 3574 阅读 548

Springboot Websocket & SseEmitter

spring boot websocket eventsource


现代浏览器已经普遍支持WebSocket和EventSource,可以用它们实现与服务器的实时通信。
WebSocket复杂些,但是双工的;EventSource相对简单且能自动重连,但仅支持服务端推。

WebSocket 配置

Springboot加入下面的依赖即可使用WebSocket

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

WebSocketConfig.class

注册 Websocket Handler & Interceptor

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Bean
  5. public TextWebSocketHandler myWebSocketHandler() {
  6. return new MyWebSocketHandler();
  7. }
  8. @Override
  9. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  10. registry.addHandler(myWebSocketHandler(), "/myweb/socket").addInterceptors(new WebSocketInterceptor()).setAllowedOrigins("*");//https://www.cnblogs.com/exmyth/p/11720371.html
  11. //registry.addHandler(myWebSocketHandler(), "/myweb/sockjs").addInterceptors(new WebSocketInterceptor()).withSockJS();
  12. }
  13. @Bean
  14. public TaskScheduler taskScheduler() {//避免找不到TaskScheduler Bean
  15. ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
  16. taskScheduler.setPoolSize(10);
  17. taskScheduler.initialize();
  18. return taskScheduler;
  19. }
  20. }

WebSocketInterceptor.class

  1. public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
  2. @Override
  3. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
  4. String channel = ((ServletServerHttpRequest)request).getServletRequest().getParameter("ch");
  5. attributes.put("channel", channel);//传参
  6. return super.beforeHandshake(request, response, wsHandler, attributes);
  7. }
  8. @Override
  9. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
  10. super.afterHandshake(request, response, wsHandler, ex);
  11. }
  12. }

MyWebSocketHandler.class

  1. @Slf4j
  2. public class MyWebSocketHandler extends TextWebSocketHandler{
  3. @Autowired MyWebSocketService myWebSocketService;//注入需要的Service
  4. @Override
  5. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  6. String channel = (String)session.getAttributes().get("channel");//获取参数
  7. //记下session和参数用于下一步发消息...
  8. }
  9. @Override
  10. public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
  11. String channel = (String)session.getAttributes().get("channel");
  12. //做会话关闭后的处理...
  13. }
  14. @Override
  15. protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  16. log.debug("receive text message: " + message.getPayload());
  17. //收到消息的处理...
  18. }
  19. public void send(WebSocketSession session, String text) {
  20. try {
  21. TextMessage message = new TextMessage(text);
  22. session.sendMessage(message);//发送消息的方法
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

SseEmitter

Controller方法返回SseEmitter对象即可为客户端提供EventSource

  1. private static Set<SseEmitter> emitters = new HashSet<>();
  2. @RequestMapping("/myweb/eventsource")
  3. @ResponseBody
  4. SseEmitter eventSource(String ch) {
  5. SseEmitter emitter = new SseEmitter(0L);
  6. emitters.put(emitter);//记下emitter用于之后发送数据
  7. emitter.onCompletion(() -> {
  8. emitters.remove(emitter);//做连接关闭后的处理(ch, emitter)...
  9. });
  10. emitter.onTimeout(() -> {
  11. emitter.complete();
  12. });
  13. emitter.onError((e) -> {
  14. emitter.completeWithError(e);
  15. });
  16. return emitter;
  17. }

向所有的emitters发送数据text

  1. SseEventBuilder builder = SseEmitter.event().data(text);
  2. emitters.forEach(emitter -> {
  3. try {
  4. emitter.send(builder);
  5. } catch (Exception e) {
  6. errors.add(emitter);
  7. }
  8. });

客户端连接

前端js对象WebSocket和EventSource分别用于连接这两种服务。
具体用法略。

Nginx需要的额外配置

EventSource

  1. proxy_http_version 1.1;
  2. proxy_set_header Connection '';
  3. proxy_buffering off;
  4. proxy_cache off;
  5. gzip off;
  6. chunked_transfer_encoding off;

WebSocket

  1. proxy_http_version 1.1;
  2. proxy_set_header Upgrade $http_upgrade;
  3. proxy_set_header Connection "Upgrade";

已知问题

  1. 火狐下EventSource中断之后不会自动重连。
  2. IE系列浏览器都不支持EventSource。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注