开发者社区> libin9iOak> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

聊聊 分布式 WebSocket 集群解决方案(一)

简介: 聊聊 分布式 WebSocket 集群解决方案
+关注继续查看

聊聊 分布式 WebSocket 集群解决方案


17.png


最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。


期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到大家,并且能一起分享这方面的想法与研究。


以下是我的场景描述


资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)

应用发布限制条件:由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/aaa),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用

需求:用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息

集群中的应用服务类型:每个集群实例都负责http无状态请求服务与ws长连接服务



| 系统架构图


18.png


在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便。下文会有解释。


本文涉及的技术栈:


Eureka 服务发现与注册

Redis Session共享

Redis 消息订阅

Spring Boot

Zuul 网关

Spring Cloud Gateway 网关

Spring WebSocket 处理长连接

Ribbon 负载均衡

Netty 多协议NIO网络通信框架

Consistent Hash 一致性哈希算法

相信能走到这一步的人都了解过我上面列举的技术栈了,如果还没有,可以先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认大家都了解过了…


| 技术可行性分析


下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案。


WebSocketSession与HttpSession


在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {
   System.out.println("服务器接收到的消息: "+ message );
   //send message to client
   session.sendMessage(new TextMessage("message"));
}


那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。


有的人可能会想:我可不可以将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息然后重新构建websocket session…我只想说这种方法如果有人能试出来,请告诉我一声…


以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redis和spring-boot-starter-redis,大家可以从网上找个demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。


| 解决方案的演变


Netty与Spring WebSocket


刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信

   /**
    * TODO 根据服务器传进来的id,分配到不同的group
    */
   private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
       //retain增加引用计数,防止接下来的调用引用失效
       System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text());
       //将消息发送给group里面的所有channel,也就是发送消息给客户端
       GROUP.writeAndFlush(msg.retain());
   }

那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点。


使用netty实现websocket


玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:


与系统的其他应用集成不方便,在rpc调用的时候,无法享受springcloud里feign服务调用的便利性

业务逻辑可能要重复实现

使用netty可能需要重复造轮子

怎么连接上服务注册中心,也是一件麻烦的事情

restful服务与ws服务需要分开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信很多人都习惯了。



使用spring websocket实现ws服务

spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单

第一步:添加依赖


<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


第二步:添加配置类


@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(myHandler(), "/")
        .setAllowedOrigins("*");
}
 
@Bean
 public WebSocketHandler myHandler() {
     return new MessageHandler();
 }
}


第三步:实现消息监听类


@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {
   private List<WebSocketSession> clients = new ArrayList<>();
 
   @Override
   public void afterConnectionEstablished(WebSocketSession session) {
       clients.add(session);
       System.out.println("uri :" + session.getUri());
       System.out.println("连接建立: " + session.getId());
       System.out.println("current seesion: " + clients.size());
   }
 
   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
       clients.remove(session);
       System.out.println("断开连接: " + session.getId());
   }
 
   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) {
       String payload = message.getPayload();
       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
       System.out.println("接受到的数据" + map);
       clients.forEach(s -> {
           try {
               System.out.println("发送消息给: " + session.getId());
               s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
   }
}


从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。


因此我的应用服务架构是这样子的:一个应用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分出去要使用feign来进行服务调用。第一本人比较懒惰,第二拆分与不拆分相差在多了一层服务间的io调用,所以就没有这么做了。


| 从zuul技术转型到spring cloud gateway


要实现websocket集群,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:


zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,但是2.0版本没有被spring boot集成,而且文档不健全。因此转型是必须的,同时转型也很容易实现。


在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中以下的某些配置是必须的,在这里提前避免大家采坑


server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**


如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record。


@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri != null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {
              throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }
 
  @Override
  public int getOrder() {
      return HTTPS_TO_HTTP_FILTER_ORDER;
  }
}


这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。


| session广播


这是最简单的websocket集群通讯解决方案。场景如下:


教师A想要群发消息给他的学生们


教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}

网关接收到消息,获取集群所有ip地址,逐个调用教师的请求

集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求


19.png


session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。

spring cloud中获取服务集群中每台服务器信息的方法如下:


@Resource
private EurekaClient eurekaClient;
 
Application app = eurekaClient.getApplication("service-name");
//instanceInfo包括了一台服务器ip,port等消息
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address: " + instanceInfo.getIPAddr());


服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
WebSocket就是这么简单(二)
今天在慕课网上看到了Java的新教程(Netty入门之WebSocket初体验):https://www.imooc.com/learn/941 WebSocket我是听得很多,没有真正使用过的技术。我之前也去了解过了WebSocket究竟是什么东西,不过一直没有去实践过。 我在写监听器博文的时候,在线人数功能用监听器的是来做,在评论有说使用WebSocket的方式会更加好。 那么,我们就来探究一下WebSocket究竟是什么东西,顺便了解一下Netty。!
50 0
WebSocket入门
HTML5 拥有众多引人注目的新特性,如 Canvas、本地存储、多媒体编程接口、WebSocket 等等。其中,WebSocket 的出现使得浏览器提供对 Socket 的支持成为可能,从而在浏览器和服务器之间提供了一个基于 TCP 连接的双向通道。使用 WebSocket,web开发人员可以很方便地构建实时 web 应用。
102 0
第一章: WebSocket初识
关于webSocket的话题并不少见,比如我们经常遇到的聊天室啦,实时的消息互动啦,巴拉巴拉很多东西都会使用的一项技术。接下来我们就具体来看看webSocket到底是什么东西以及具体的应用场景 WebSocket协议是基于TCP的一种新的网络协议。
1080 0
用tornado实现一个简单的websocket样例
想用SPRING MVC,NODE.JS EXPRESS,TORNADO实现同一个功能,开阔一下视野。 先来TORNADO的吧。。 客户端代码都差不多,主要是服务端代码。 TORNADO的说法: http://www.
1305 0
WebSocket实现网页聊天室
1、先看演示效果,如下: 2、websocket 技术背景 我们知道,传统的HTTP协议是无状态的,每次请求(request)都要由客户端(如浏览器)主动发起,服务端进行处理后返回response结果,而服务端很难主动向客户端发送数据;这种客户端是主动方,服务端是被动方的传统Web模式对于信息变化不频繁的Web应用来说造成的麻烦较小,而对于涉及实时信息的Web应用却带来了很大的不便
2703 0
通俗易懂地解决中文乱码问题(2) --- 分析解决Mysql插入移动端表情符报错 ‘incorrect string value: '\xF0...
原文:【原创】通俗易懂地解决中文乱码问题(2) --- 分析解决Mysql插入移动端表情符报错 ‘incorrect string value: '\xF0... 这篇blog重点在解决问题,如果你对字符编码并不是特别了解,建议先看看 《 【原创】通俗易懂地解决中文乱码问题(1) --- 跨平台乱码 》。
1128 0
Git 系列之二:Windows 下 Git 客户端的选择,及 msysGit 各种中文问题的解决-转载
Git 系列之二:Windows 下 Git 客户端的选择,及 msysGit 各种中文问题的解决 在 Windows 下用 NetBeans 做 PHP 开发,首先想到的是 NetBeans 的插件:NBGit。
874 0
+关注
322
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载