Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

简介: Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

SpringBoot WebSocket

思路:

后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据

本文仅放一部分重要的代码,完整代码可参看github仓库

websocket 前端测试 :http://www.easyswoole.com/wstool.html

image.png

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

项目目录

$ tree 
.
├── README.md
├── demo.iml
├── pom.xml
└── src
    ├── main
        ├── java
        │   └── com
        │       └── example
        │           └── demo
        │               ├── Application.java
        │               ├── config
        │               │   ├── WebMvcConfig.java
        │               │   └── WebSocketConfig.java
        │               ├── consts
        │               │   └── MessageTypeConst.java
        │               ├── controller
        │               │   ├── IndexController.java
        │               │   └── MessageController.java
        │               ├── dto
        │               │   └── MessageDto.java
        │               ├── service
        │               │   ├── MessageService.java
        │               │   └── impl
        │               │       └── MessageServiceImpl.java
        │               └── webscoket
        │                   └── WebSocketServer.java
        └── resources
            ├── application.properties
            ├── static
            │   ├── js
            │   │   ├── index.js
            │   │   └── utils.js
            │   └── libs
            │       └── axios
            │           └── 1.3.2
            │               └── axios.min.js
            └── templates
                └── websocket.html

完整依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.7</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.example</groupId>
  <artifactId>demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>demo</name>
  <description>Demo project for Spring Boot</description>
  <properties>
    <java.version>1.8</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-freemarker</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

配置

package com.example.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * 启用WebSocket
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocketServer.java

package com.example.demo.webscoket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * WebSocket服务类
 */
@Component
@Slf4j
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
    /**
     * 心跳消息
     */
    private final static String PING = "ping";
    private final static String PONG = "pong";
    /**
     * 存放每个客户端对应的 WebSocketServer 对象
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收 userId
     */
    private String userId = "";
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        // 加入
        webSocketMap.put(userId, this);
        log.info("新用户上线:" + userId + ", 当前在线人数为:" + getOnlineCount());
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (!webSocketMap.containsKey(userId)) {
            return;
        }
        webSocketMap.remove(userId);
        log.info("用户下线:" + userId + ", 当前在线人数为:" + getOnlineCount());
    }
    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + userId + ",报文:" + message);
        if (PING.equals(message)) {
            try {
                this.sendMessage(PONG);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
    /**
     * 群发消息
     */
    public static void sendMessageToAll(String message) throws IOException {
        for (Map.Entry<String, WebSocketServer> entry : webSocketMap.entrySet()) {
            WebSocketServer webSocketServer = entry.getValue();
            webSocketServer.sendMessage(message);
        }
    }
    /**
     * 单发消息
     */
    public static void sendMessageToUser(String toUserId, String message) throws IOException {
        if (webSocketMap.containsKey(toUserId)) {
            webSocketMap.get(toUserId).sendMessage(message);
        } else {
            log.error("请求的 userId:" + toUserId + "不在该服务器上");
        }
    }
    /**
     * 获取在线人数
     */
    public static int getOnlineCount() {
        return webSocketMap.size();
    }
    /**
     * 用户是否在线
     */
    public static Boolean isOnline(String userId) {
        return webSocketMap.containsKey(userId);
    }
    /**
     * 在线用户
     */
    public static Set<String> getOnlineUsers() {
        Set<String> set = new HashSet<>();
        Enumeration<String> enumeration = webSocketMap.keys();
        while (enumeration.hasMoreElements()) {
            set.add(enumeration.nextElement());
        }
        return set;
    }
}

前端页面 websocket.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Java后端WebSocket实现</title>
</head>
<body>
    <h2>Welcome WebSocket</h2>
    <div>
        <input id="textInput" type="text" placeholder="message"/>
        <button id="sendMessageButton">发送消息</button>
        <button id="closeConnectButton">关闭连接</button>
        <button id="sendPingButton">发送ping</button>
    </div>
    <hr/>
    <div id="message"></div>
    <script type="text/javascript" src="/static/libs/axios/1.3.2/axios.min.js"></script>
    <script type="text/javascript" src="/static/js/utils.js"></script>
    <script type="text/javascript" src="/static/js/index.js"></script>
</body>
</html>

前端逻辑 index.js

/**
 * 程序入口
 */
// 心跳消息
var PING = "ping";
var PONG = "pong";
// 获取一个用户id
var uuid = utils.getUUID();
var url = "ws://127.0.0.1:8080/ws/" + uuid;
//判断当前浏览器是否支持WebSocket
var websocket = null;
function initWebsocket() {
  if ("WebSocket" in window) {
    websocket = new WebSocket(url);
  } else {
    alert("当前浏览器 Not support websocket");
  }
  //连接成功建立的回调方法
  websocket.onopen = function () {
    setMessageInnerHTML("WebSocket连接成功");
  };
  //接收到消息的回调方法
  websocket.onmessage = function (event) {
    var data = event.data;
    // 忽略心跳消息
    if (data === PONG) {
      return;
    }
    setMessageInnerHTML(JSON.parse(event.data).text);
  };
  //连接关闭的回调方法
  websocket.onclose = function () {
    setMessageInnerHTML("WebSocket连接关闭");
  };
  //连接发生错误的回调方法
  websocket.onerror = function (e) {
    console.log(e);
    setMessageInnerHTML("WebSocket连接发生错误");
  };
}
//关闭WebSocket连接
function closeWebSocket() {
  websocket.close();
}
// 监听窗口关闭事件,当窗口关闭时
// 主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
  closeWebSocket();
};
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
  document.getElementById("message").innerHTML += innerHTML + "<br/>";
}
//发送消息
function sendMessage() {
  var text = textInput.value;
  if (!text) {
    return;
  }
  // 统一发送json格式,便于扩展
  var data = {
    type: "text",
    text: text,
  };
  // websocket.send(JSON.stringify(data));
  axios.post("/api/sendMessageToAllUser", data);
  // setMessageInnerHTML(data.text);
  textInput.value = "";
}
// 事件监听
function bindEventListener() {
  var textInput = document.querySelector("#textInput");
  var sendMessageButton = document.querySelector("#sendMessageButton");
  var closeConnectButton = document.querySelector("#closeConnectButton");
  var sendPingButton = document.querySelector("#sendPingButton");
  textInput.addEventListener("keypress", function (e) {
    if (e.key === "Enter") {
      sendMessage();
    }
  });
  sendMessageButton.addEventListener("click", function (e) {
    sendMessage();
  });
  sendPingButton.addEventListener("click", function (e) {
    websocket.send(PING);
  });
  closeConnectButton.addEventListener("click", function (e) {
    closeWebSocket();
  });
}
/**
 * 入口
 */
(function () {
  initWebsocket();
  bindEventListener();
})();

参考


相关文章
|
8月前
|
Java 数据库连接 API
Java 8 + 特性及 Spring Boot 与 Hibernate 等最新技术的实操内容详解
本内容涵盖Java 8+核心语法、Spring Boot与Hibernate实操,按考试考点分类整理,含技术详解与代码示例,助力掌握最新Java技术与应用。
248 2
|
9月前
|
Java 数据库连接 API
Java 对象模型现代化实践 基于 Spring Boot 与 MyBatis Plus 的实现方案深度解析
本文介绍了基于Spring Boot与MyBatis-Plus的Java对象模型现代化实践方案。采用Spring Boot 3.1.2作为基础框架,结合MyBatis-Plus 3.5.3.1进行数据访问层实现,使用Lombok简化PO对象,MapStruct处理对象转换。文章详细讲解了数据库设计、PO对象实现、DAO层构建、业务逻辑封装以及DTO/VO转换等核心环节,提供了一个完整的现代化Java对象模型实现案例。通过分层设计和对象转换,实现了业务逻辑与数据访问的解耦,提高了代码的可维护性和扩展性。
364 1
|
9月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
458 0
|
9月前
|
Java API 微服务
Java 21 与 Spring Boot 3.2 微服务开发从入门到精通实操指南
《Java 21与Spring Boot 3.2微服务开发实践》摘要: 本文基于Java 21和Spring Boot 3.2最新特性,通过完整代码示例展示了微服务开发全流程。主要内容包括:1) 使用Spring Initializr初始化项目,集成Web、JPA、H2等组件;2) 配置虚拟线程支持高并发;3) 采用记录类优化DTO设计;4) 实现JPA Repository与Stream API数据访问;5) 服务层整合虚拟线程异步处理和结构化并发;6) 构建RESTful API并使用Springdoc生成文档。文中特别演示了虚拟线程配置(@Async)和StructuredTaskSco
1015 0
|
9月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1976 0
|
5月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
284 1
|
5月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
304 1
|
6月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案

热门文章

最新文章