由于http或https只支持客户端向服务端发送请求,不支持服务端主动推送消息到客户端,所以当客户端需要实时获取服务端的变化时,一般采用了客户端轮询的方式,这种方式不仅消耗性能,还容易导致宕机问题,这时候就需要websorket这种方式
原理
SpringBoot整合WebSocket推送消息的原理主要基于WebSocket协议的双向通信机制。以下是主要的步骤和过程:
建立连接: 当客户端启动应用程序并需要与服务器建立WebSocket连接时,它首先会在HTTP协议的基础上与服务器进行握手。这个过程中,客户端会向服务器传送WebSocket支持的版本号等信息,同时建立起TCP连接。
协议转换: 在这个阶段,协议从HTTP转换为WebSocket。在WebSocket协议中,服务器和客户端之间可以直接进行双向的数据传输,而不需要像HTTP协议那样,每次发送数据都要通过请求和响应的方式。
消息推送: 一旦WebSocket连接建立,服务器就可以随时向客户端发送消息。服务器将消息封装在WebSocket的帧(frame)中,然后通过已建立的TCP连接发送给客户端。客户端在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理。
消息接收: 客户端也可以随时向服务器发送消息。客户端将消息封装在WebSocket帧中,然后通过TCP连接发送给服务器。服务器在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理,然后通过WebSocket帧将处理结果返回给客户端。
总的来说,SpringBoot整合WebSocket推送消息的原理就是基于WebSocket协议的双向通信机制,通过建立TCP连接,实现服务器与客户端之间的全双工通信,从而使得数据的推送和接收更加高效和灵活。
实战
前端html代码
<!DOCTYPE HTML> <html> <head> <title>My WebSocket</title> </head> <body> Welcome<br/> <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> <div id="message"> </div> </body> <script type="text/javascript"> var websocket = null; //判断当前浏览器是否支持WebSocket if('WebSocket' in window){ websocket = new WebSocket("ws://127.0.0.1:8092/websocket/yuanrenjie"); } else{ alert('Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回调方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭连接 function closeWebSocket(){ websocket.close(); } //发送消息 function send(){ var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
方案
第一步引入pom
// 一般父级项目都是springboot,这里无需设置版本,如果实在需要可百度自行选取合适的版本 <!-- 引入websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
第二步:websorket配置
单链接版
@Component @Slf4j @ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8087/webSocket/userId; public class WebSocketServer { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** * 用户ID */ private String userId; //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 // 注:底下WebSocket是当前类名 private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>(); // 用来存在线连接用户信息 private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>(); /** * 链接成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value="userId")String userId) { try { this.session = session; this.userId = userId; webSockets.add(this); sessionPool.put(userId, session); log.info("【websocket消息】有新的连接,总数为:"+webSockets.size()); } catch (Exception e) { } } /** * 链接关闭调用的方法 */ @OnClose public void onClose() { try { webSockets.remove(this); sessionPool.remove(this.userId); log.info("【websocket消息】连接断开,总数为:"+webSockets.size()); } catch (Exception e) { } } /** * 收到客户端消息后调用的方法 * * @param message * @param */ @OnMessage public void onMessage(String message) { log.info("【websocket消息】收到客户端消息:"+message); } /** 发送错误时的处理 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误,原因:"+error.getMessage()); error.printStackTrace(); } // 此为广播消息 public void sendAllMessage(String message) { log.info("【websocket消息】广播消息:"+message); for(WebSocketServer webSocket : webSockets) { try { if(webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } // 此为单点消息 public void sendOneMessage(String userId, String message) { Session session = sessionPool.get(userId); if (session != null&&session.isOpen()) { try { log.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } // 此为单点消息(多人) public void sendMoreMessage(String[] userIds, String message) { for(String userId:userIds) { Session session = sessionPool.get(userId); if (session != null&&session.isOpen()) { try { log.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } }
多链接版
@Component @Slf4j @ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8087/webSocket/userId; public class WebSocketServer { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** * 用户ID */ private String userId; // //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 // //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 // // 注:底下WebSocket是当前类名 // private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>(); // // 用来存在线连接用户信息 // private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>(); private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>> userwebSocketMap = new ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>>(); private static ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>(); /** * 链接成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value="userId") final String userId) { this.session = session; this.userId = userId; if (!exitUser(userId)) { initUserInfo(userId); } else { CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = getUserSocketSet(userId); WebSocketServerSet.add(this); userCountIncrease(userId); } System.out.println("有" + userId + "新连接加入!当前在线人数为" + getCurrUserCount(userId)); } /** * 链接关闭调用的方法 */ @OnClose public void onClose() { CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = userwebSocketMap.get(userId); //从set中删除 WebSocketServerSet.remove(this); //在线数减1 userCountDecrement(userId); System.out.println("有一连接关闭!当前在线人数为" + getCurrUserCount(userId)); } /** * 收到客户端消息后调用的方法 * * @param message * @param */ @OnMessage public void onMessage(String message) { onMessageMethod(userId, message); } /** 发送错误时的处理 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误,原因:"+error.getMessage()); error.printStackTrace(); } // 此为广播消息 public void sendAllMessage(String message) { log.info("【websocket消息】广播消息:"+message); for(WebSocketServer webSocket : userwebSocketMap.get(userId)) { try { if(webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } // 此为单点消息 public void sendOneMessage(String userId, String message) { onMessageMethod(userId, message); } private void onMessageMethod(String userId, String message) { CopyOnWriteArraySet<WebSocketServer> webSocketSet = userwebSocketMap.get(userId); if (webSocketSet!=null){ System.out.println("来自客户端" + userId + "的消息:" + message); //群发消息 for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } } } else { log.error("消息接收人为null"+message); } } // 此为单点消息(多人) public void sendMoreMessage(String[] userIds, String message) { for(String userId:userIds) { Session session = this.session; if (session != null&&session.isOpen()) { try { log.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 * * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } public boolean exitUser(String userId) { return userwebSocketMap.containsKey(userId); } public CopyOnWriteArraySet<WebSocketServer> getUserSocketSet(String userId) { return userwebSocketMap.get(userId); } public void userCountIncrease(String userId) { if (count.containsKey(userId)) { count.put(userId, count.get(userId) + 1); } } public void userCountDecrement(String userId) { if (count.containsKey(userId)) { count.put(userId, count.get(userId) - 1); } } public void removeUserConunt(String userId) { count.remove(userId); } public Integer getCurrUserCount(String userId) { return count.get(userId); } private void initUserInfo(String userId) { CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = new CopyOnWriteArraySet<WebSocketServer>(); WebSocketServerSet.add(this); userwebSocketMap.put(userId, WebSocketServerSet); count.put(userId, 1); } }
测试类
@RestController @RequestMapping("/test") public class TestController { @Resource private WebSocketServer webSocketServer; /** * 查询消息list * @param * @return */ @GetMapping("getTest") public void getMessageinfoList(){ // 推送websorket //创建业务消息信息 JSONObject obj = new JSONObject(); obj.put("cmd", "topic");//业务类型 obj.put("msgId", "1");//消息id obj.put("msgTxt", "1");//消息内容 //全体发送 webSocketServer.sendAllMessage(obj.toJSONString(0)); //单个用户发送 (userId为用户id) webSocketServer.sendOneMessage("1", obj.toJSONString(0)); //多个用户发送 (userIds为多个用户id,逗号‘,’分隔) String a[]={"1","2"}; webSocketServer.sendMoreMessage(a, obj.toJSONString(0)); } }
调用测试类的接口,即可在network的ws栏的请求看到消息
配置wss
因为网站之前已经配置过Https了就不用配置证书了
修改nginx
server { listen 443 ssl; server_name localhost; ssl_certificate /opt/ssl/test.com.pem; ssl_certificate_key /opt/ssl/test.com.key; ssl_session_cache shared:SSL:1m; ssl_session_timeout 5m; ssl_ciphers HIGH:!aNULL:!MD5; ssl_prefer_server_ciphers on; location / { root html; index index.html index.htm; proxy_set_header Host $host; proxy_set_header X-Real-Ip $remote_addr; proxy_set_header X-Forwarded-For $remote_addr; proxy_pass http://127.0.0.1:90; } location /websocket/ { proxy_pass http://172.20.0.113:9528; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; #由于服务器端源码(建议大家做好大小写匹配)只匹配了"Upgrade"字符串,所以如果这里填"upgrade"服务器端会将这条http请求当成普通的请求,导致websocket握手失败 proxy_set_header Connection "Upgrade"; proxy_set_header Remote_addr $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_read_timeout 600s; } }
gateway无需修改
- id: message-sorket uri: ws://192.168.0.33:9995 predicates: - Path=/websocket/**
延伸
WebSocket是一种网络通信协议,提供了全双工的通信频道,允许服务器和客户端之间的实时双向通信。这在传统HTTP轮询技术上做出了显著的提升,允许服务器在数据更新时主动推送给客户端,减少了不必要的请求和响应,提高了数据传输的效率和实时性。
WebSocket在实现推送消息的过程中,有一套详细的工作机制。
连接建立: 客户端和服务器通过WebSocket协议进行握手,建立一次性的TCP连接。握手过程中,服务器和客户端可以协商一些参数,例如消息的最大长度、是否支持二进制消息等。一旦连接建立,服务器和客户端就可以通过这个连接进行数据的发送和接收。
消息发送: 在WebSocket中,服务器和客户端都可以随时发送消息。发送消息时,需要将消息内容封装在一个WebSocket帧中,然后通过网络连接发送给对方。对方接收到消息后,解析WebSocket帧,取出消息内容进行处理。
消息接收: 除了发送消息,WebSocket还提供了接收消息的功能。当服务器或客户端接收到一个WebSocket帧时,会解析出其中的消息内容,然后进行处理。
连接关闭: WebSocket连接可以随时关闭。关闭连接后,服务器和客户端就不再通过这个连接进行数据传输。
另外值得注意的是,在使用WebSocket的过程中,有时会遇到网络断开的情况。虽然服务器端仍然会向客户端发送数据,但是客户端无法接收到这些数据。为了解决这个问题,WebSocket提供了一种心跳机制。
WebSocket的心跳机制允许客户端和服务器在连接建立后,每隔一段时间向对方发送一个心跳消息,以检查连接是否仍然有效。如果长时间没有接收到对方的心跳消息,服务器或客户端可以认为连接已经断开,然后采取相应的措施,例如关闭连接、重新连接等。
总结起来,WebSocket协议的推送消息机制主要包括建立连接、发送和接收消息、关闭连接以及心跳机制等环节。这些环节协同工作,使得WebSocket成为实现实时、高效数据传输的重要工具。