springboot集成websocket实战:站内消息实时推送

简介: 现有一个类似boss直聘的招聘小程序,求职端和招聘端可以根据身份进行切换.要求实现两个问题: 1.求职端或是招聘端上线时,如果有未读消息需要显示未读消息数; 2.求职端和招聘端同时在线时,求职端投递简历之后,要求招聘端能够实时显示有新投递简历的消息信息;招聘端发送面试邀请时,求职端消息列表中实时显示出面试要求的消息信息.

背景


   现有一个类似boss直聘的招聘小程序,求职端和招聘端可以根据身份进行切换.要求实现两个问题:

   1.求职端或是招聘端上线时,如果有未读消息需要显示未读消息数;

   2.求职端和招聘端同时在线时,求职端投递简历之后,要求招聘端能够实时显示有新投递简历的消息信息;招聘端发送面试邀请时,求职端消息列表中实时显示出面试要求的消息信息.


处理方案梳理


   对于第一个问题,可以在进入到小程序页面之后,服务端提供一个获取用户未读消息数据查询的接口.

   对于第二个问题想到的处理方案是可以在小程序里面做一个轮询处理,间隔一定的时间去不断去调用获取用户消息列表接口.存在的问题就是请求中有大半是无用,浪费带宽和服务器资源。所以考虑的处理方案是使用websocket处理,websocket的优点是服务端可以主动向客户端发送消息.能处理客户端轮询查询带来的问题.

   自己实现了springboot+websocket进行消息推送的简单demo,使用在线的websocket地址用作客户端进行测试.有同样需求的同学可以参考并加入到自己的业务逻辑中.下面说下实现过程;


实现过程


   主要展示服务端实现方案,前端只需要创建websocket连接,每种客户端创建连接的方式不同,这里不提供实现,测试的时候使用在线websocket测试网站作为客户端.


服务端配置

   需要引入的依赖:


 

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>


websocket配置类:

@Configuration
public class WebSocketConfig {
    // 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}


websocket服务端主要逻辑:

@Component
@Slf4j
@Service
@ServerEndpoint("/webSocket/{login}/{type}")  // login表示用户唯一标识,type表示用户类型:1.求职身份;2.面试身份
public class WebSocketServer {
    //当前在线连接数
    private static int onlineCount = 0;
    // 每个在线用户会创建一个WebSocketServer对象
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    // 存放所有在线的客户端 key为用户的唯一标识:login,value为每个会话连接
    private static Map<String, Session> clients = new ConcurrentHashMap<>();
//    private Session session;
    // 用户login
    private String login = "";
    // 处理使用@Autowire注入为空的问题,使用静态变量处理
    private static NewsMapper newsMapper= SpringUtils.getBean(NewsMapper.class);
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("login") String login, @PathParam("type") Integer type) {
        clients.put(login, session);
//        this.session = session;
        webSocketSet.add(this);     //加入set中
        this.login = login;
        addOnlineCount();           //在线数加1
        try {
            // 查询用户未读消息数
            Integer unReadMsg=0;
            List<Long> noReadingNewsIds = newsMapper.findNoReadingNewsId(type, login);
            if(CollectionUtil.isNotEmpty(noReadingNewsIds)){
                unReadMsg=noReadingNewsIds.size();
            }
           // 用户上线提醒
            sendMessage("用户"+login+"已上线("+("1".equals(login) ? "求职者)":"招聘者)")+",未读消息数:"+unReadMsg,session);
            log.info("有新窗口开始监听用户详情id:" + login +",当前在线人数为:" + getOnlineCount());
        } catch (IOException e) {
            log.error("websocket IO Exception");
        }
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        clients.remove(login);
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        log.info("释放的login为:"+login);
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }
    /**
     * 收到客户端消息后调用的方法
     * @ Param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来自窗口" + login + "的信息:" + message);
        //群发消息
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message,session);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * @ Param session
     * @ Param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message,Session session) throws IOException {
        if(session != null){
            session.getBasicRemote().sendText(message);
        }
    }
    /**
     * 校验是否在线,在线需要返回用户session信息
     */
    public Session checkIsOnline(String login) throws IOException {
        for (String onLineLogin : clients.keySet()) {
            if(login.equals(onLineLogin)){
                return clients.get(login);
            }
        }
        return null;
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
    public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() {
        return webSocketSet;
    }
}


模拟业务场景:求职端发送简历给求职者

@RestController
@RequestMapping("/webSocket")
public class WebSocketServerController {
    @Autowired
    private WebSocketServer webSocketServer;
    @GetMapping("/serverToClient")
    public ResultVo sendServerToClient(String login) throws IOException {
        System.out.println("求职者给招聘者发送简历操作..............");
        // 判断用户是否在线
        Session session = webSocketServer.checkIsOnline(login);
        if(ObjectUtil.isNotNull(session)){
            webSocketServer.sendMessage("求职者给招聘者发送简历,招聘者已接收",session);
        }
        return ResultVoUtil.success();
    }
}


模拟客户端

   这里客户端使用在线websocket测试网站,地址:http://www.websocket-test.com/,测试数据:

login:1,userType:1 模拟求职端;
login:2,userType:2 模拟招聘端;

请求地址为websocket服务器所在项目ip以及端口和请求参数,本地项目参数如下:

ws://172.16.0.131:8080/webSocket/1/1


模拟用户1登录上线:

7918d07b420de3c312bbe080349ba3e7_f2fa14044aa04467a5c17033abcc16b5.png


模拟用户2登录上线:

4358f66884a230b2833fdd3d98323434_041748898bad4be9ad68eb6b541e925c.png

用户上线之后可以获取到未读消息数!

模拟用户1求职者发送简历给用户2应聘者

这里提供模拟接口:

f86a4ac07c91780d6642a92aa3e7ef96_642c9bc43e7b4f2093354aec9099600e.png

以上是模拟客户端,不刷新情况下可以接收到服务端消息.

c6c2e2ff59e66ae9b97133cba33f314b_dff2fbf1c20a44759789f973959c62a5.png

以上是站内消息推送的实现方案,希望对有同样需求的同学有所帮助!


相关文章
|
11天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
121 1
|
6月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
324 0
|
6月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
251 0
|
3月前
|
前端开发
SpringBoot2.3.1集成Knife4j接口文档
SpringBoot2.3.1集成Knife4j接口文档
460 44
|
2月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
391 0
第07课:Spring Boot集成Thymeleaf模板引擎
|
2月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
346 2
|
2月前
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
156 2
|
2月前
|
存储 人工智能 Java
Springboot集成AI Springboot3 集成阿里云百炼大模型CosyVoice2 实现Ai克隆语音(未持久化存储)
本项目基于Spring Boot 3.5.3与Java 17,集成阿里云百炼大模型CosyVoice2实现音色克隆与语音合成。内容涵盖项目搭建、音色创建、音频合成、音色管理等功能,适用于希望快速掌握Spring Boot集成语音AI技术的开发者。需提前注册阿里云并获取API Key。
|
3月前
|
缓存 安全 Java
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
298 6