springboot集成websocket
1. 前言
这里我们使用springboot搭建一个轻量级的websocket服务,同时提供4个入参。使用websocket服务可以轻松和微信小程序、支付宝小程序、网页就行双向通讯,非常实用方便。
wss地址
这里是我们自己搭建的中转服务websocket地址。
wss://xxxx.cn/netgate/auth/pid/sn/openid
参数说明
2. 引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
3. 配置文件
新增WebSocketConfig.java的配置文件。用来初始化websocket
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer{ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(webSocketHandler(),"/netgate/{auth}/{pid}/{sn}/{openid}") //注册Handler .addInterceptors(new WebSocketHandshakeInterceptor()) //注册Interceptor .setAllowedOrigins("*"); //注册Interceptor // //2.注册SockJS,提供SockJS支持(主要是兼容ie8) // String sockjs_url = "/sockjs/socketServer.do"; //设置sockjs的地址 // registry.addHandler(netgateHandler, sockjs_url) //注册Handler // .addInterceptors(new WebSocketHandshakeInterceptor()) //注册Interceptor // .withSockJS(); //支持sockjs协议 } @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); container.setMaxTextMessageBufferSize(2*1024*1024);//8192*1024 1024*1024*1024 container.setMaxBinaryMessageBufferSize(2*1024*1024); container.setAsyncSendTimeout(55000l); container.setMaxSessionIdleTimeout(55000l);//心跳 return container; } @Bean public TextWebSocketHandler webSocketHandler() { return new NetgateHandler(); } }
4. Websocket握手过滤器
过滤器的作用主要是用来做连接接入的鉴权,和参数解析。
新增WebSocketHandshakeInterceptor.java文件
/** * Describes: WebSocket握手拦截器 * Auth: Eric */ public class WebSocketHandshakeInterceptor implements HandshakeInterceptor { private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketHandshakeInterceptor.class); public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { String path = request.getURI().getPath(); if(requestIsValid(path)){ String[] params = getParams(path); attributes.put("WEBSOCKET_AUTH", params[0]); attributes.put("WEBSOCKET_PID", params[1]); attributes.put("WEBSOCKET_SN", params[2]); attributes.put("WEBSOCKET_OPENID", params[3]); attributes.put("WEBSOCKET_FIRSTONE","yes"); } } System.out.println("================Before Handshake================"); return true; } public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { System.out.println("================After Handshake================"); if(e!=null) e.printStackTrace(); System.out.println("================After Handshake================"); } private boolean requestIsValid(String url){ //在这里可以写上具体的鉴权逻辑 boolean isvalid = false; if(StringUtils.isNotEmpty(url) && url.startsWith("/netgate/") && url.split("/").length==6){ isvalid = true; } return isvalid; } private String[] getParams(String url){ url = url.replace("/netgate/",""); return url.split("/"); } }
5. Websocket处理器
在这里可以做消息的接收和发送。
这里MqttGateway是springboot整合MQTT客户端的服务类。具体可以参考下一篇的springboot集成mqtt
新建NetgateHandler.java文件
/** * Websocket处理器 */ @Component public class NetgateHandler extends TextWebSocketHandler { @Autowired private MqttGateway mqttGateway; /* * 网关连接集合 * 第一级:设备序列号 sn * 第二级:用户唯一标识 openid */ private static ConcurrentHashMap<String,ConcurrentHashMap<String,WebSocketSession>> netgates = new ConcurrentHashMap<String,ConcurrentHashMap<String,WebSocketSession>>(); /** * 处理前端发送的文本信息 * js调用websocket.send时候,会调用该方法 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { if(!session.isOpen()) { System.out.println("连接已关闭,不再处理该连接的消息!"); return; } String mes = ObjectUtils.toString(message.getPayload(),""); String pid = session.getAttributes().get("WEBSOCKET_PID").toString(); String sn = session.getAttributes().get("WEBSOCKET_SN").toString(); if(message==null || "".equals(mes)){ System.out.println(getSysDate()+"============接收到空消息,不予处理。"); }else if(mes.length()==1){ //心跳消息过滤掉 return; }else { //转发成mqtt消息 String topic = "pay/"+pid+"/server/"+sn; System.out.println(topic); System.out.println(getSysDate()+"============消息处理完成:"+mes); mqttGateway.sendToMqtt(topic,mes); } } /** * 当新连接建立的时候,被调用 * 连接成功时候,会触发页面上onOpen方法 * * @param session * @throws Exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println(getSysDate()+"============正在初始化连接:"+session.getId()); try { //初始化连接,把session存储起来 this.initUsers(session); } catch (Exception e) { System.out.println(getSysDate()+"============初始化连接异常-开始:"+e.getMessage()); e.printStackTrace(); System.out.println(getSysDate()+"============初始化连接异常-结束:"+e.getMessage()); } System.out.println(getSysDate()+"============初始化连接完成:"+session.getId()); } /** * 当连接关闭时被调用 * * @param session * @param status * @throws Exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { System.out.println(getSysDate()+"============正在关闭连接:"+session.getId()+",isOpen:"+session.isOpen()+";code:"+status.getCode()); try { System.out.println("断开连接状态值"+status.getCode()); this.removeSession(session); } catch (Exception e) { System.out.println(getSysDate()+"============关闭连接异常-开始:"+e.getMessage()); e.printStackTrace(); System.out.println(getSysDate()+"============关闭连接异常-结束:"+e.getMessage()); } System.out.println(getSysDate()+"============正在关闭完成:"+session.getId()+",isOpen:"+session.isOpen()+";code:"+status.getCode()); } /** * 传输错误时调用 * * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.out.println(getSysDate()+"============发生传输错误:"+session.getId()+";session.isOpen():"+session.isOpen()+";exception:"+exception.getMessage()); exception.printStackTrace(); if (session.isOpen()) { //try { session.close(); } catch (Exception e) {e.printStackTrace();} }else { try { this.removeSession(session); } catch (Exception e) { System.out.println(getSysDate()+"============传输错误处理异常-开始:"+e.getMessage()); e.printStackTrace(); System.out.println(getSysDate()+"============传输错误处理异常-结束:"+e.getMessage()); } } System.out.println(getSysDate()+"============错误处理结束:"+session.getId()+";session.isOpen():"+session.isOpen()+";exception:"+exception.getMessage()); } public synchronized void sendMsgToNetgateSn(String sn, String msg) { if(netgates.size()>0 && netgates.containsKey(sn) && !netgates.get(sn).isEmpty()){ //获取EID对应的后台管理连接 多个 for (WebSocketSession ws: netgates.get(sn).values()){ System.out.println("对网关指令开始发送啦:sn="+sn+"消息内容"+msg); try {ws.sendMessage(new TextMessage(msg));} catch (IOException e) {System.out.println(getSysDate()+"发生了异常:"+e.getMessage());e.printStackTrace();continue;} } } } //连接接入的处理方法 private synchronized void initUsers(WebSocketSession session){ String pid = (String) session.getAttributes().get("WEBSOCKET_PID"); String sn = (String) session.getAttributes().get("WEBSOCKET_SN"); String openid = (String) session.getAttributes().get("WEBSOCKET_OPENID"); if(StringUtils.isNotEmpty(pid) && StringUtils.isNotEmpty(sn) && StringUtils.isNotEmpty(openid)){ ConcurrentHashMap<String,WebSocketSession> netgate = netgates.get(sn); if(netgate == null){ netgate = new ConcurrentHashMap<String,WebSocketSession>(); } WebSocketSession session_exist = netgate.get(sn); if(session_exist!=null) { System.out.println("检测到相同SN重复连接,SN:"+sn+",连接ID:"+session_exist.getId()+",准备清理失效的连接。。。"); try {session_exist.close();} catch (IOException e) {e.printStackTrace();} } netgate.putIfAbsent(openid, session); netgates.put(sn,netgate); } } //连接被关闭时处理集合 private synchronized void removeSession(WebSocketSession session){ String sn = (String) session.getAttributes().get("WEBSOCKET_SN"); String openid = (String) session.getAttributes().get("WEBSOCKET_OPENID"); if(netgates.get(sn).containsKey(openid)) { WebSocketSession exist_session = netgates.get(sn).get(openid); //确保是同一个session 不是同一个session则不应该进行下一步的处理 if(exist_session.getId()!=null && exist_session.getId().equals(session.getId())) { netgates.get(sn).remove(openid); System.out.println("有一网关连接关闭!SN:"+sn+",当前在线数量为"+netgates.get(sn).keySet().size()); }else { System.out.println("检测到关闭session异常,程序中断处理,关闭sessionId:"+session.getId()+",当前实际sessionId:"+exist_session.getId()); } }else { System.out.println("检测到关闭session异常,程序中断处理,系统中未找到对应的session,Sn="+sn+"openid="+openid); } } private String getSysDate() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 return df.format(new Date()); } }