@TedZhou
2020-06-28T05:56:53.000000Z
字数 3574
阅读 780
spring boot websocket eventsource
现代浏览器已经普遍支持WebSocket和EventSource,可以用它们实现与服务器的实时通信。
WebSocket复杂些,但是双工的;EventSource相对简单且能自动重连,但仅支持服务端推。
Springboot加入下面的依赖即可使用WebSocket
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
注册 Websocket Handler & Interceptor
@Configuration@EnableWebSocketpublic class WebSocketConfig implements WebSocketConfigurer {@Beanpublic TextWebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myWebSocketHandler(), "/myweb/socket").addInterceptors(new WebSocketInterceptor()).setAllowedOrigins("*");//https://www.cnblogs.com/exmyth/p/11720371.html//registry.addHandler(myWebSocketHandler(), "/myweb/sockjs").addInterceptors(new WebSocketInterceptor()).withSockJS();}@Beanpublic TaskScheduler taskScheduler() {//避免找不到TaskScheduler BeanThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(10);taskScheduler.initialize();return taskScheduler;}}
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {String channel = ((ServletServerHttpRequest)request).getServletRequest().getParameter("ch");attributes.put("channel", channel);//传参return super.beforeHandshake(request, response, wsHandler, attributes);}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {super.afterHandshake(request, response, wsHandler, ex);}}
@Slf4jpublic class MyWebSocketHandler extends TextWebSocketHandler{@Autowired MyWebSocketService myWebSocketService;//注入需要的Service@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String channel = (String)session.getAttributes().get("channel");//获取参数//记下session和参数用于下一步发消息...}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {String channel = (String)session.getAttributes().get("channel");//做会话关闭后的处理...}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {log.debug("receive text message: " + message.getPayload());//收到消息的处理...}public void send(WebSocketSession session, String text) {try {TextMessage message = new TextMessage(text);session.sendMessage(message);//发送消息的方法} catch (Exception e) {e.printStackTrace();}}}
Controller方法返回SseEmitter对象即可为客户端提供EventSource
private static Set<SseEmitter> emitters = new HashSet<>();@RequestMapping("/myweb/eventsource")@ResponseBodySseEmitter eventSource(String ch) {SseEmitter emitter = new SseEmitter(0L);emitters.put(emitter);//记下emitter用于之后发送数据emitter.onCompletion(() -> {emitters.remove(emitter);//做连接关闭后的处理(ch, emitter)...});emitter.onTimeout(() -> {emitter.complete();});emitter.onError((e) -> {emitter.completeWithError(e);});return emitter;}
向所有的emitters发送数据text
SseEventBuilder builder = SseEmitter.event().data(text);emitters.forEach(emitter -> {try {emitter.send(builder);} catch (Exception e) {errors.add(emitter);}});
前端js对象WebSocket和EventSource分别用于连接这两种服务。
具体用法略。
proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;gzip off;chunked_transfer_encoding off;
proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";
