springboot整合websorket推送消息实战

简介: springboot整合websorket推送消息实战

由于http或https只支持客户端向服务端发送请求,不支持服务端主动推送消息到客户端,所以当客户端需要实时获取服务端的变化时,一般采用了客户端轮询的方式,这种方式不仅消耗性能,还容易导致宕机问题,这时候就需要websorket这种方式

原理

SpringBoot整合WebSocket推送消息的原理主要基于WebSocket协议的双向通信机制。以下是主要的步骤和过程:

建立连接: 当客户端启动应用程序并需要与服务器建立WebSocket连接时,它首先会在HTTP协议的基础上与服务器进行握手。这个过程中,客户端会向服务器传送WebSocket支持的版本号等信息,同时建立起TCP连接。

协议转换: 在这个阶段,协议从HTTP转换为WebSocket。在WebSocket协议中,服务器和客户端之间可以直接进行双向的数据传输,而不需要像HTTP协议那样,每次发送数据都要通过请求和响应的方式。

消息推送: 一旦WebSocket连接建立,服务器就可以随时向客户端发送消息。服务器将消息封装在WebSocket的帧(frame)中,然后通过已建立的TCP连接发送给客户端。客户端在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理。

消息接收: 客户端也可以随时向服务器发送消息。客户端将消息封装在WebSocket帧中,然后通过TCP连接发送给服务器。服务器在接收到消息后,解析WebSocket帧,取出消息内容并进行相应的处理,然后通过WebSocket帧将处理结果返回给客户端。

总的来说,SpringBoot整合WebSocket推送消息的原理就是基于WebSocket协议的双向通信机制,通过建立TCP连接,实现服务器与客户端之间的全双工通信,从而使得数据的推送和接收更加高效和灵活。

实战

前端html代码

<!DOCTYPE HTML>
<html>
<head>
    <title>My WebSocket</title>
</head>
                        
<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>
 
<script type="text/javascript">
    var websocket = null;
 
    //判断当前浏览器是否支持WebSocket
    if('WebSocket' in window){
        websocket = new WebSocket("ws://127.0.0.1:8092/websocket/yuanrenjie");
    }
    else{
        alert('Not support websocket')
    }
 
    //连接发生错误的回调方法
    websocket.onerror = function(){
        setMessageInnerHTML("error");
    };
 
    //连接成功建立的回调方法
    websocket.onopen = function(event){
        setMessageInnerHTML("open");
    }
 
    //接收到消息的回调方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }
 
    //连接关闭的回调方法
    websocket.onclose = function(){
        setMessageInnerHTML("close");
    }
 
    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function(){
        websocket.close();
    }
 
    //将消息显示在网页上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
 
    //关闭连接
    function closeWebSocket(){
        websocket.close();
    }
 
    //发送消息
    function send(){
        var message = document.getElementById('text').value;
        websocket.send(message);
    }
</script>
</html>

方案

第一步引入pom

// 一般父级项目都是springboot,这里无需设置版本,如果实在需要可百度自行选取合适的版本
 <!-- 引入websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

第二步:websorket配置

单链接版

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;

public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    /**
     * 用户ID
     */
    private String userId;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
    //  注:底下WebSocket是当前类名
    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
    // 用来存在线连接用户信息
    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value="userId")String userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
        } catch (Exception e) {
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
        } catch (Exception e) {
        }
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】收到客户端消息:"+message);
    }

    /** 发送错误时的处理
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误,原因:"+error.getMessage());
        error.printStackTrace();
    }


    // 此为广播消息
    public void sendAllMessage(String message) {
        log.info("【websocket消息】广播消息:"+message);
        for(WebSocketServer webSocket : webSockets) {
            try {
                if(webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 此为单点消息
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null&&session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:"+message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 此为单点消息(多人)
    public void sendMoreMessage(String[] userIds, String message) {
        for(String userId:userIds) {
            Session session = sessionPool.get(userId);
            if (session != null&&session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:"+message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

多链接版

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    /**
     * 用户ID
     */
    private String userId;

//    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
//    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
//    //  注:底下WebSocket是当前类名
//    private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
//    // 用来存在线连接用户信息
//    private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();

    private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>> userwebSocketMap = new ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketServer>>();

    private static ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>();


    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value="userId") final String userId) {
        this.session = session;
        this.userId = userId;
        if (!exitUser(userId)) {
            initUserInfo(userId);
        } else {
            CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = getUserSocketSet(userId);
            WebSocketServerSet.add(this);
            userCountIncrease(userId);
        }
        System.out.println("有" + userId + "新连接加入!当前在线人数为" + getCurrUserCount(userId));
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = userwebSocketMap.get(userId);
        //从set中删除
        WebSocketServerSet.remove(this);
        //在线数减1
        userCountDecrement(userId);
        System.out.println("有一连接关闭!当前在线人数为" + getCurrUserCount(userId));
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param
     */
    @OnMessage
    public void onMessage(String message) {
        onMessageMethod(userId, message);
    }

    /** 发送错误时的处理
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误,原因:"+error.getMessage());
        error.printStackTrace();
    }


    // 此为广播消息
    public void sendAllMessage(String message) {
        log.info("【websocket消息】广播消息:"+message);
        for(WebSocketServer webSocket : userwebSocketMap.get(userId)) {
            try {
                if(webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 此为单点消息
    public void sendOneMessage(String userId, String message) {
               onMessageMethod(userId, message);
    }

    private void onMessageMethod(String userId, String message) {
        CopyOnWriteArraySet<WebSocketServer> webSocketSet = userwebSocketMap.get(userId);
        if (webSocketSet!=null){
            System.out.println("来自客户端" + userId + "的消息:" + message);
            //群发消息
            for (WebSocketServer item : webSocketSet) {
                try {
                    item.sendMessage(message);
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }
            }
        }
        else {
            log.error("消息接收人为null"+message);
        }
    }

    // 此为单点消息(多人)
    public void sendMoreMessage(String[] userIds, String message) {
        for(String userId:userIds) {
            Session session = this.session;
            if (session != null&&session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:"+message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }


    public boolean exitUser(String userId) {
        return userwebSocketMap.containsKey(userId);
    }

    public CopyOnWriteArraySet<WebSocketServer> getUserSocketSet(String userId) {
        return userwebSocketMap.get(userId);
    }

    public void userCountIncrease(String userId) {
        if (count.containsKey(userId)) {
            count.put(userId, count.get(userId) + 1);
        }
    }


    public void userCountDecrement(String userId) {
        if (count.containsKey(userId)) {
            count.put(userId, count.get(userId) - 1);
        }
    }

    public void removeUserConunt(String userId) {
        count.remove(userId);
    }

    public Integer getCurrUserCount(String userId) {
        return count.get(userId);
    }

    private void initUserInfo(String userId) {
        CopyOnWriteArraySet<WebSocketServer> WebSocketServerSet = new CopyOnWriteArraySet<WebSocketServer>();
        WebSocketServerSet.add(this);
        userwebSocketMap.put(userId, WebSocketServerSet);
        count.put(userId, 1);
    }
}

测试类

@RestController
@RequestMapping("/test")
public class TestController {

    @Resource
    private WebSocketServer webSocketServer;

    /**
     * 查询消息list
     * @param
     * @return
     */
    @GetMapping("getTest")
    public void getMessageinfoList(){
        // 推送websorket
        //创建业务消息信息
        JSONObject obj = new JSONObject();
        obj.put("cmd", "topic");//业务类型
        obj.put("msgId", "1");//消息id
        obj.put("msgTxt", "1");//消息内容
//全体发送
        webSocketServer.sendAllMessage(obj.toJSONString(0));
//单个用户发送 (userId为用户id)
        webSocketServer.sendOneMessage("1", obj.toJSONString(0));
//多个用户发送 (userIds为多个用户id,逗号‘,’分隔)
        String a[]={"1","2"};
       webSocketServer.sendMoreMessage(a, obj.toJSONString(0));

    }
}

调用测试类的接口,即可在network的ws栏的请求看到消息

配置wss

因为网站之前已经配置过Https了就不用配置证书了

修改nginx

server {
        listen       443 ssl;
        server_name  localhost;

        ssl_certificate      /opt/ssl/test.com.pem;
        ssl_certificate_key  /opt/ssl/test.com.key;

        ssl_session_cache    shared:SSL:1m;
        ssl_session_timeout  5m;

        ssl_ciphers  HIGH:!aNULL:!MD5;
        ssl_prefer_server_ciphers  on;

        location / {
            root   html;
            index  index.html index.htm;
            proxy_set_header Host $host;
            proxy_set_header X-Real-Ip $remote_addr;
            proxy_set_header X-Forwarded-For $remote_addr;
            proxy_pass http://127.0.0.1:90;
        }
    location /websocket/ {
            proxy_pass    http://172.20.0.113:9528;
       proxy_http_version 1.1;
       proxy_set_header Upgrade $http_upgrade;
       #由于服务器端源码(建议大家做好大小写匹配)只匹配了"Upgrade"字符串,所以如果这里填"upgrade"服务器端会将这条http请求当成普通的请求,导致websocket握手失败
       proxy_set_header Connection "Upgrade";
       proxy_set_header Remote_addr $remote_addr;
       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       proxy_read_timeout 600s;
        }
    }

gateway无需修改


  
         - id: message-sorket
           uri: ws://192.168.0.33:9995
           predicates:
             - Path=/websocket/**

延伸

WebSocket是一种网络通信协议,提供了全双工的通信频道,允许服务器和客户端之间的实时双向通信。这在传统HTTP轮询技术上做出了显著的提升,允许服务器在数据更新时主动推送给客户端,减少了不必要的请求和响应,提高了数据传输的效率和实时性。

WebSocket在实现推送消息的过程中,有一套详细的工作机制。

连接建立: 客户端和服务器通过WebSocket协议进行握手,建立一次性的TCP连接。握手过程中,服务器和客户端可以协商一些参数,例如消息的最大长度、是否支持二进制消息等。一旦连接建立,服务器和客户端就可以通过这个连接进行数据的发送和接收。

消息发送: 在WebSocket中,服务器和客户端都可以随时发送消息。发送消息时,需要将消息内容封装在一个WebSocket帧中,然后通过网络连接发送给对方。对方接收到消息后,解析WebSocket帧,取出消息内容进行处理。

消息接收: 除了发送消息,WebSocket还提供了接收消息的功能。当服务器或客户端接收到一个WebSocket帧时,会解析出其中的消息内容,然后进行处理。

连接关闭: WebSocket连接可以随时关闭。关闭连接后,服务器和客户端就不再通过这个连接进行数据传输。

另外值得注意的是,在使用WebSocket的过程中,有时会遇到网络断开的情况。虽然服务器端仍然会向客户端发送数据,但是客户端无法接收到这些数据。为了解决这个问题,WebSocket提供了一种心跳机制。

WebSocket的心跳机制允许客户端和服务器在连接建立后,每隔一段时间向对方发送一个心跳消息,以检查连接是否仍然有效。如果长时间没有接收到对方的心跳消息,服务器或客户端可以认为连接已经断开,然后采取相应的措施,例如关闭连接、重新连接等。

总结起来,WebSocket协议的推送消息机制主要包括建立连接、发送和接收消息、关闭连接以及心跳机制等环节。这些环节协同工作,使得WebSocket成为实现实时、高效数据传输的重要工具。

目录
相关文章
|
10月前
|
Java API Spring
Java SpringBoot 公众号集成模板推送消息
Java SpringBoot 公众号集成模板推送消息
|
10月前
|
前端开发 安全 Java
SpringBoot + WebSocket+STOMP指定推送消息
本文将简单的描述SpringBoot + WebSocket+STOMP指定推送消息场景,不包含信息安全加密等,请勿用在生产环境。
218 0
|
前端开发 Java Spring
Java:SpringBoot整合WebSocket实现服务端向客户端推送消息
Java:SpringBoot整合WebSocket实现服务端向客户端推送消息
329 0
Java:SpringBoot整合WebSocket实现服务端向客户端推送消息
|
运维 机器人 Java
Springboot 整合 企业微信机器人助手推送消息
Springboot 整合 企业微信机器人助手推送消息
914 0
Springboot 整合 企业微信机器人助手推送消息
|
网络协议 前端开发 Java
Springboot整合Websocket案例(后端向前端主动推送消息)
在手机上相信都有来自服务器的推送消息,比如一些及时的新闻信息,这篇文章主要就是实现这个功能,只演示一个基本的案例。使用的是websocket技术。
1144 0
Springboot整合Websocket案例(后端向前端主动推送消息)
|
前端开发 Java 程序员
Netty实战,Springboot + netty +websocket 实现推送消息(附源码)
Netty实战,Springboot + netty +websocket 实现推送消息(附源码)
876 0
Netty实战,Springboot + netty +websocket 实现推送消息(附源码)
|
4天前
|
Java 应用服务中间件 Maven
Spring Boot项目打war包(idea:多种方式)
Spring Boot项目打war包(idea:多种方式)
16 1
|
4天前
|
Java Linux
Springboot 解决linux服务器下获取不到项目Resources下资源
Springboot 解决linux服务器下获取不到项目Resources下资源
|
3天前
|
Java Maven
SpringBoot项目的用maven插件打包报Test错误
SpringBoot项目的用maven插件打包报Test错误
|
2天前
|
前端开发 JavaScript Java
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)五(前端页面
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)五(前端页面
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)五(前端页面