springboot集成websocket

简介: springboot集成websocket

springboot集成websocket

1. 前言

这里我们使用springboot搭建一个轻量级的websocket服务,同时提供4个入参。使用websocket服务可以轻松和微信小程序、支付宝小程序、网页就行双向通讯,非常实用方便。

wss地址

这里是我们自己搭建的中转服务websocket地址。

wss://xxxx.cn/netgate/auth/pid/sn/openid

参数说明

image.png

2. 引入依赖

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

3. 配置文件

新增WebSocketConfig.java的配置文件。用来初始化websocket

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
       registry.addHandler(webSocketHandler(),"/netgate/{auth}/{pid}/{sn}/{openid}")                          //注册Handler
           .addInterceptors(new WebSocketHandshakeInterceptor())           //注册Interceptor
           .setAllowedOrigins("*");                                 //注册Interceptor
//     //2.注册SockJS,提供SockJS支持(主要是兼容ie8)
//     String sockjs_url = "/sockjs/socketServer.do";                          //设置sockjs的地址
//     registry.addHandler(netgateHandler, sockjs_url)                         //注册Handler
//         .addInterceptors(new WebSocketHandshakeInterceptor())               //注册Interceptor
//         .withSockJS();                                                      //支持sockjs协议
    }
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(2*1024*1024);//8192*1024 1024*1024*1024
        container.setMaxBinaryMessageBufferSize(2*1024*1024);
        container.setAsyncSendTimeout(55000l);
        container.setMaxSessionIdleTimeout(55000l);//心跳
        return container;
    }
    @Bean
    public TextWebSocketHandler webSocketHandler() {
        return new NetgateHandler();
    }
}

4. Websocket握手过滤器

过滤器的作用主要是用来做连接接入的鉴权,和参数解析。

新增WebSocketHandshakeInterceptor.java文件

/**
 * Describes: WebSocket握手拦截器
 * Auth: Eric
 */
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketHandshakeInterceptor.class);
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
          String path = request.getURI().getPath();
          if(requestIsValid(path)){
            String[] params = getParams(path);
            attributes.put("WEBSOCKET_AUTH", params[0]);
            attributes.put("WEBSOCKET_PID", params[1]);
            attributes.put("WEBSOCKET_SN", params[2]);
            attributes.put("WEBSOCKET_OPENID", params[3]);
            attributes.put("WEBSOCKET_FIRSTONE","yes");
          }
        }
        System.out.println("================Before Handshake================");
        return true;
    }
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
      System.out.println("================After Handshake================");
      if(e!=null) e.printStackTrace();
      System.out.println("================After Handshake================");
    }
    private boolean requestIsValid(String url){
        //在这里可以写上具体的鉴权逻辑
      boolean isvalid = false;
      if(StringUtils.isNotEmpty(url)
          && url.startsWith("/netgate/")
          && url.split("/").length==6){
        isvalid = true;
      }
      return isvalid;
    }
    private String[] getParams(String url){
      url = url.replace("/netgate/","");
      return url.split("/");
    }
}

5. Websocket处理器

在这里可以做消息的接收和发送。

这里MqttGateway是springboot整合MQTT客户端的服务类。具体可以参考下一篇的springboot集成mqtt

新建NetgateHandler.java文件

/**
 * Websocket处理器
 */
@Component
public class NetgateHandler extends TextWebSocketHandler {
  @Autowired
  private MqttGateway mqttGateway;
    /*
   * 网关连接集合
   * 第一级:设备序列号 sn
   * 第二级:用户唯一标识 openid
   */
  private static ConcurrentHashMap<String,ConcurrentHashMap<String,WebSocketSession>> netgates = new ConcurrentHashMap<String,ConcurrentHashMap<String,WebSocketSession>>();
    /**
     * 处理前端发送的文本信息
     * js调用websocket.send时候,会调用该方法
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
      if(!session.isOpen()) {
      System.out.println("连接已关闭,不再处理该连接的消息!");
      return;
    }
      String mes = ObjectUtils.toString(message.getPayload(),"");
      String pid = session.getAttributes().get("WEBSOCKET_PID").toString();
      String sn = session.getAttributes().get("WEBSOCKET_SN").toString();
    if(message==null || "".equals(mes)){
      System.out.println(getSysDate()+"============接收到空消息,不予处理。");
    }else if(mes.length()==1){
      //心跳消息过滤掉
      return;
    }else {
      //转发成mqtt消息
      String topic = "pay/"+pid+"/server/"+sn;
      System.out.println(topic);
      System.out.println(getSysDate()+"============消息处理完成:"+mes);
      mqttGateway.sendToMqtt(topic,mes);
    }
    }
    /**
     * 当新连接建立的时候,被调用
     * 连接成功时候,会触发页面上onOpen方法
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
      System.out.println(getSysDate()+"============正在初始化连接:"+session.getId());
        try {
            //初始化连接,把session存储起来
      this.initUsers(session);
    } catch (Exception e) {
      System.out.println(getSysDate()+"============初始化连接异常-开始:"+e.getMessage());
      e.printStackTrace();
      System.out.println(getSysDate()+"============初始化连接异常-结束:"+e.getMessage());
    }
        System.out.println(getSysDate()+"============初始化连接完成:"+session.getId());
    }
    /**
     * 当连接关闭时被调用
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
      System.out.println(getSysDate()+"============正在关闭连接:"+session.getId()+",isOpen:"+session.isOpen()+";code:"+status.getCode());
      try {
        System.out.println("断开连接状态值"+status.getCode());
      this.removeSession(session);
    } catch (Exception e) {
      System.out.println(getSysDate()+"============关闭连接异常-开始:"+e.getMessage());
      e.printStackTrace();
      System.out.println(getSysDate()+"============关闭连接异常-结束:"+e.getMessage());
    }
      System.out.println(getSysDate()+"============正在关闭完成:"+session.getId()+",isOpen:"+session.isOpen()+";code:"+status.getCode());
    }
    /**
     * 传输错误时调用
     *
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
      System.out.println(getSysDate()+"============发生传输错误:"+session.getId()+";session.isOpen():"+session.isOpen()+";exception:"+exception.getMessage());
      exception.printStackTrace();
      if (session.isOpen()) {
          //try { session.close(); } catch (Exception e) {e.printStackTrace();}
        }else {
          try {
          this.removeSession(session);
        } catch (Exception e) {
          System.out.println(getSysDate()+"============传输错误处理异常-开始:"+e.getMessage());
          e.printStackTrace();
          System.out.println(getSysDate()+"============传输错误处理异常-结束:"+e.getMessage());
        }
        }
      System.out.println(getSysDate()+"============错误处理结束:"+session.getId()+";session.isOpen():"+session.isOpen()+";exception:"+exception.getMessage());
    }
  public synchronized void sendMsgToNetgateSn(String sn, String msg)  {
    if(netgates.size()>0 && netgates.containsKey(sn) && !netgates.get(sn).isEmpty()){
      //获取EID对应的后台管理连接 多个
      for (WebSocketSession ws: netgates.get(sn).values()){
        System.out.println("对网关指令开始发送啦:sn="+sn+"消息内容"+msg);
        try {ws.sendMessage(new TextMessage(msg));} catch (IOException e) {System.out.println(getSysDate()+"发生了异常:"+e.getMessage());e.printStackTrace();continue;}
      }
    }
  }
  //连接接入的处理方法
  private synchronized void initUsers(WebSocketSession session){
    String pid = (String) session.getAttributes().get("WEBSOCKET_PID");
    String sn = (String) session.getAttributes().get("WEBSOCKET_SN");
    String openid = (String) session.getAttributes().get("WEBSOCKET_OPENID");
    if(StringUtils.isNotEmpty(pid) && StringUtils.isNotEmpty(sn) && StringUtils.isNotEmpty(openid)){
      ConcurrentHashMap<String,WebSocketSession> netgate = netgates.get(sn);
      if(netgate == null){
        netgate = new ConcurrentHashMap<String,WebSocketSession>();
      }
      WebSocketSession session_exist = netgate.get(sn);
      if(session_exist!=null) {
        System.out.println("检测到相同SN重复连接,SN:"+sn+",连接ID:"+session_exist.getId()+",准备清理失效的连接。。。");
        try {session_exist.close();} catch (IOException e) {e.printStackTrace();}
      }
      netgate.putIfAbsent(openid, session);
      netgates.put(sn,netgate);
    }
  }
  //连接被关闭时处理集合
  private synchronized void removeSession(WebSocketSession session){
    String sn = (String) session.getAttributes().get("WEBSOCKET_SN");
    String openid = (String) session.getAttributes().get("WEBSOCKET_OPENID");
    if(netgates.get(sn).containsKey(openid)) {
      WebSocketSession exist_session = netgates.get(sn).get(openid);
      //确保是同一个session 不是同一个session则不应该进行下一步的处理
      if(exist_session.getId()!=null && exist_session.getId().equals(session.getId())) {
        netgates.get(sn).remove(openid);
        System.out.println("有一网关连接关闭!SN:"+sn+",当前在线数量为"+netgates.get(sn).keySet().size());
      }else {
        System.out.println("检测到关闭session异常,程序中断处理,关闭sessionId:"+session.getId()+",当前实际sessionId:"+exist_session.getId());
      }
    }else {
      System.out.println("检测到关闭session异常,程序中断处理,系统中未找到对应的session,Sn="+sn+"openid="+openid);
    }
  }
  private String getSysDate() {
     SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
        return df.format(new Date());
  }
}
相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
81
分享
相关文章
|
4月前
|
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
157 0
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
155 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
152 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
43 2
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
23 2
|
1月前
|
SpringBoot2.3.1集成Knife4j接口文档
SpringBoot2.3.1集成Knife4j接口文档
162 44
Springboot集成AI Springboot3 集成阿里云百炼大模型CosyVoice2 实现Ai克隆语音(未持久化存储)
本项目基于Spring Boot 3.5.3与Java 17,集成阿里云百炼大模型CosyVoice2实现音色克隆与语音合成。内容涵盖项目搭建、音色创建、音频合成、音色管理等功能,适用于希望快速掌握Spring Boot集成语音AI技术的开发者。需提前注册阿里云并获取API Key。
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
104 6
SpringBoot集成Ehcache缓存使用指南
以上是SpringBoot集成Ehcache缓存的基本操作指南,帮助你在实际项目中轻松实现缓存功能。当然,Ehcache还有诸多高级特性,通过学习和实践,你可以更好地发挥它的威力。
133 20
|
2月前
|
SpringBoot快速搭建WebSocket服务端和客户端
由于工作需要,研究了SpringBoot搭建WebSocket双向通信的过程,其他的教程看了许多,感觉讲得太复杂,很容易弄乱,这里我只展示快速搭建过程。
213 1
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问