@TedZhou
2020-06-28T13:56:53.000000Z
字数 3574
阅读 548
spring
boot
websocket
eventsource
现代浏览器已经普遍支持WebSocket和EventSource,可以用它们实现与服务器的实时通信。
WebSocket复杂些,但是双工的;EventSource相对简单且能自动重连,但仅支持服务端推。
Springboot加入下面的依赖即可使用WebSocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
注册 Websocket Handler & Interceptor
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Bean
public TextWebSocketHandler myWebSocketHandler() {
return new MyWebSocketHandler();
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler(), "/myweb/socket").addInterceptors(new WebSocketInterceptor()).setAllowedOrigins("*");//https://www.cnblogs.com/exmyth/p/11720371.html
//registry.addHandler(myWebSocketHandler(), "/myweb/sockjs").addInterceptors(new WebSocketInterceptor()).withSockJS();
}
@Bean
public TaskScheduler taskScheduler() {//避免找不到TaskScheduler Bean
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
return taskScheduler;
}
}
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String channel = ((ServletServerHttpRequest)request).getServletRequest().getParameter("ch");
attributes.put("channel", channel);//传参
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
}
@Slf4j
public class MyWebSocketHandler extends TextWebSocketHandler{
@Autowired MyWebSocketService myWebSocketService;//注入需要的Service
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String channel = (String)session.getAttributes().get("channel");//获取参数
//记下session和参数用于下一步发消息...
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String channel = (String)session.getAttributes().get("channel");
//做会话关闭后的处理...
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.debug("receive text message: " + message.getPayload());
//收到消息的处理...
}
public void send(WebSocketSession session, String text) {
try {
TextMessage message = new TextMessage(text);
session.sendMessage(message);//发送消息的方法
} catch (Exception e) {
e.printStackTrace();
}
}
}
Controller方法返回SseEmitter对象即可为客户端提供EventSource
private static Set<SseEmitter> emitters = new HashSet<>();
@RequestMapping("/myweb/eventsource")
@ResponseBody
SseEmitter eventSource(String ch) {
SseEmitter emitter = new SseEmitter(0L);
emitters.put(emitter);//记下emitter用于之后发送数据
emitter.onCompletion(() -> {
emitters.remove(emitter);//做连接关闭后的处理(ch, emitter)...
});
emitter.onTimeout(() -> {
emitter.complete();
});
emitter.onError((e) -> {
emitter.completeWithError(e);
});
return emitter;
}
向所有的emitters发送数据text
SseEventBuilder builder = SseEmitter.event().data(text);
emitters.forEach(emitter -> {
try {
emitter.send(builder);
} catch (Exception e) {
errors.add(emitter);
}
});
前端js对象WebSocket和EventSource分别用于连接这两种服务。
具体用法略。
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
gzip off;
chunked_transfer_encoding off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";