正文
先看看这次实践的目录结构:
两个页面分别模拟不同用户接入websocket。
------接下来,我们开始整合WebSocket------
先是pom.xml添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
PS:application.properties不需要添加任何配置 ,我只设置了一下服务server.port=8083
接着,创建节点配置类WebSocketStompConfig.java:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketStompConfig { //这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket ,如果你使用外置的tomcat就不需要该配置文件 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
然后是WebSocket配置类,WebSocket.java:
(这里面包含这单独发送消息,群发,监听上下线等等方法)
import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @Author:JCccc * @Description: * @Date: created in 15:56 2019/5/13 */ @Component @ServerEndpoint(value = "/connectWebSocket/{userId}") public class WebSocket { private Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 在线人数 */ public static int onlineNumber = 0; /** * 以用户的姓名为key,WebSocket为对象保存起来 */ private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>(); /** * 会话 */ private Session session; /** * 用户名称 */ private String userId; /** * 建立连接 * * @param session */ @OnOpen public void onOpen(@PathParam("userId") String userId, Session session) { onlineNumber++; logger.info("现在来连接的客户id:"+session.getId()+"用户名:"+userId); this.userId = userId; this.session = session; // logger.info("有新连接加入! 当前在线人数" + onlineNumber); try { //messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息 //先给所有人发送通知,说我上线了 Map<String,Object> map1 = Maps.newHashMap(); map1.put("messageType",1); map1.put("userId",userId); sendMessageAll(JSON.toJSONString(map1),userId); //把自己的信息加入到map当中去 clients.put(userId, this); logger.info("有连接关闭! 当前在线人数" + clients.size()); //给自己发一条消息:告诉自己现在都有谁在线 Map<String,Object> map2 = Maps.newHashMap(); map2.put("messageType",3); //移除掉自己 Set<String> set = clients.keySet(); map2.put("onlineUsers",set); sendMessageTo(JSON.toJSONString(map2),userId); } catch (IOException e){ logger.info(userId+"上线的时候通知所有人发生了错误"); } } @OnError public void onError(Session session, Throwable error) { logger.info("服务端发生了错误"+error.getMessage()); //error.printStackTrace(); } /** * 连接关闭 */ @OnClose public void onClose() { onlineNumber--; //webSockets.remove(this); clients.remove(userId); try { //messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息 Map<String,Object> map1 = Maps.newHashMap(); map1.put("messageType",2); map1.put("onlineUsers",clients.keySet()); map1.put("userId",userId); sendMessageAll(JSON.toJSONString(map1),userId); } catch (IOException e){ logger.info(userId+"下线的时候通知所有人发生了错误"); } //logger.info("有连接关闭! 当前在线人数" + onlineNumber); logger.info("有连接关闭! 当前在线人数" + clients.size()); } /** * 收到客户端的消息 * * @param message 消息 * @param session 会话 */ @OnMessage public void onMessage(String message, Session session) { try { logger.info("来自客户端消息:" + message+"客户端的id是:"+session.getId()); System.out.println("------------ :"+message); JSONObject jsonObject = JSON.parseObject(message); String textMessage = jsonObject.getString("message"); String fromuserId = jsonObject.getString("userId"); String touserId = jsonObject.getString("to"); //如果不是发给所有,那么就发给某一个人 //messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息 Map<String,Object> map1 = Maps.newHashMap(); map1.put("messageType",4); map1.put("textMessage",textMessage); map1.put("fromuserId",fromuserId); if(touserId.equals("All")){ map1.put("touserId","所有人"); sendMessageAll(JSON.toJSONString(map1),fromuserId); } else{ map1.put("touserId",touserId); System.out.println("开始推送消息给"+touserId); sendMessageTo(JSON.toJSONString(map1),touserId); } } catch (Exception e){ e.printStackTrace(); logger.info("发生了错误了"); } } public void sendMessageTo(String message, String TouserId) throws IOException { for (WebSocket item : clients.values()) { // System.out.println("在线人员名单 :"+item.userId.toString()); if (item.userId.equals(TouserId) ) { item.session.getAsyncRemote().sendText(message); break; } } } public void sendMessageAll(String message,String FromuserId) throws IOException { for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineNumber; } }
接下来用一个HTML5 页面 index.html,连接当前的WebSocket节点,接/发消息, index.html:
<!DOCTYPE HTML> <html> <head> <title>Test My WebSocket</title> </head> <body> TestWebSocket <input id="text" type="text" /> <button onclick="send()">SEND MESSAGE</button> <button onclick="closeWebSocket()">CLOSE</button> <div id="message"></div> </body> <script type="text/javascript"> var websocket = null; //判断当前浏览器是否支持WebSocket if('WebSocket' in window){ //连接WebSocket节点 websocket = new WebSocket("ws://localhost:8083/connectWebSocket/001"); } else{ alert('Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回调方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭连接 function closeWebSocket(){ websocket.close(); } //发送消息 function send(){ var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
为了演示多人接入,我们再弄一个index2.html:
好了,一切准备就绪,那么 把项目跑起来:
访问index.html模拟用户001连接websocket服务:
可以看到一上线,我们默认就推送了一条上线消息
接下来继续访问index2.html,模拟用户002也接入websocket:
此刻,我们模拟咱们服务器给客户推送消息,有群发和单独发送,我们一一实践:
单独发送,只需要调用websocket.java里面的 sendMessageTo方法:
那么我们来写个简单的推送信息接口,
@Autowired WebSocket webSocket; @ResponseBody @GetMapping("/sendTo") public String sendTo(@RequestParam("userId") String userId,@RequestParam("msg") String msg) throws IOException { webSocket.sendMessageTo(msg,userId); return "推送成功"; }
我们试着给001这位用户推送个消息:
可以看到001的页面收到了消息,002没有收到(肯定的):
群发(所有接入到websocket的用户都能收到):
@ResponseBody @GetMapping("/sendAll") public String sendAll(@@RequestParam("msg") String msg) throws IOException { String fromUserId="system";//其实没用上 webSocket.sendMessageAll(msg,fromUserId); return "推送成功"; }
我们试着给所有用户推送个消息:
可以看到大家都收到了这个群发消息:
然后是客户给服务端推送消息,直接操作起来:
其实就是websocket.java里面的onMessage 方法:
细看,其实里面写了点消息逻辑。 这是为了区分这个是一条上线消息还是下线消息等等。
那么发送简单直接给服务器推送消息的话,可以把后边的逻辑先注释掉。 也就是:
然后简单的客户端推送消息给服务器如:
可以看到控制台的打印:
正常收到消息,那么接下来我们把注释的代码打开,
这样只要我们符合逻辑,就能实现001给002 发送消息,或者001给所有人发送消息等等。这些其实都是通过将消息推送到服务器,服务器再判断进行转发而已。
测试一下,001给002发消息:
我们把消息弄成json格式:
{ "message" :" hello,我是001,我想和你做朋友", "userId":"001", "to":"002" }
然后发送:
可以看到控制台有打印:
然后在去看看002用户的页面,成功收到了001的私发消息:
还有其他业务类型 001给所有用户群发等等这些看代码就知道,其实也是根据某个key判断,然后进行发送,就不测试了。
该篇websocket的实践介绍,就到此吧。