一个注解实现 WebSocket 集群方案,这样玩才爽! 下

简介: 一个注解实现 WebSocket 集群方案,这样玩才爽! 下

然后对于这些连接进行一个统一的管理

通过连接工厂ConnectionFactory我们可以将任意的连接适配成Connection对象,并实现各种连接间的消息转发

每个连接都会配置一个MessageEncoderMessageDecoder用于消息的编码和解码,而且不同类别的连接对应的编码器和解码器肯定是不一样的,比如转发的消息和发给真实客户端的消息很大程度上都是有区别的,所以额外定义了一个MessageCodecAdapter用来适配不同类型的编解码器,也能让大家在自定义时方便管理

消息发送

现在当我们发送某条消息之后,消息就会被转发到其他的服务实例,所有的客户端就都能收到了

不对啊,在有些情况下我们不想让所有客户端都收到啊,能不能我们想让谁收到就让谁收到啊

真麻烦,来,我把所有的连接都给你,你自己选吧

连接选择

我们需要在消息发送时确定发送给哪些连接

image.png

于是我就定义了一个连接选择器ConnectionSelector

每次要发送消息的时候,我都会匹配一个连接选择器,然后通过选择器来获得需要发送消息的连接,而我们可以通过自定义连接选择器来实现我们消息的精准发送

这里其实就是我为什么会取名WebSocketLoadBalanceConcept的原因,为什么要叫LoadBalance呢

Ribbon通过IRule来选择一个Server

我通过ConnectionSelector来选择一个Connection集合

是不是有异曲同工之妙

继续来说自定义选择器

准备工作:

  • 我们的Connection有一个metadata字段用于存放自定义属性
  • 我们的Message有一个headers字段用于存放消息头

给指定用户发送消息

很多场景下我们需要给指定的用户发送消息

首先当客户端连接上来时,可以通过参数或者主动发送一个消息将userId发给服务端,然后服务端将得到的userId存在Connectionmetadata

接着我们给需要发送的Message添加一个header,将对应的userId作为消息头

这样我们就可以自定义一个连接选择器通过判断Message是否包含userId消息头来作为匹配的条件,当Message的headers中存在userId时,对Connection中的metadata进行userId的匹配来筛选需要发送消息的连接

由于userId是唯一的,当我们自身服务连上来的客户端中已经匹配到就不需要再转发了,如果没有匹配到就通过其他服务实例的客户端进行消息转发

库中已经实现了对应的UserSelectorUserMessage,可以使用配置开启并通过在连接路径上添加userId参数来标记用户

当然我们也可以借用缓存来精确的判断需不需要转发或者是需要转发给哪几个服务,把userId和服务的instanceId等一些具有唯一性的数据缓存在Redis中,当给用户发送消息时,从Redis中获得用户对应的服务实例的instanceId或是具有唯一性的数据,如果经过匹配就是当前服务就可以直接下发,如果是其他服务就转发给那个对应的服务就行了

给指定路径发送消息

还有一种场景也比较常见就是类似主题订阅,如订阅设备状态更新的数据,就要给每一个对应路径的连接发送消息了

我们可以使用不同的路径来表示不同主题,然后自定义一个连接选择器来匹配连接的路径和消息头中指定的路径

当然库中也已经实现了对应的PathSelectorPathMessage,可以通过配置开启

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

结束

最后请允许我发表一点对于抽象的拙见

抽象其实就和 “道生一,一生二,二生三,三生万物” 一样,根据你的顶级接口(也就是核心功能)不断的向外展开,你的顶级接口就是道(狭义的来讲)

以这个库为例,ConnectionLoadBalanceConcept就是这个库的道,他的核心功能就是发送消息,至于怎么发,发给谁,不确定,像是一个混沌的状态

那么什么是一,二,三呢,我们发送消息需要载体于是就有了ConnectionMessage,我们需要对Connection进行管理于是就有了ConnectionRepository,我们需要转发消息于是就有了ConnectionSubscriber等等

而万物就像是具体的实现,是能落实的,基于Spring Cloud服务发现的连接管理器DiscoveryConnectionServerManager,基于路径的连接选择器PathSelector,基于Reactive的WebSocket连接ReactiveWebSocketConnection

就像是你创造的世界,不断的衍生出各种各样的规则,这些规则相辅相成,让你的世界平稳的运行

当然你的世界也有可能存在bug,手动狗头




相关文章
|
移动开发 网络协议 NoSQL
.NET Core WebSocket实现简易、高性能、集群即时通讯组件
.NET Core WebSocket实现简易、高性能、集群即时通讯组件
354 0
|
消息中间件 负载均衡 算法
聊聊 分布式 WebSocket 集群解决方案(二)
聊聊 分布式 WebSocket 集群解决方案
1948 0
聊聊 分布式 WebSocket 集群解决方案(二)
|
4月前
|
前端开发 NoSQL JavaScript
Websocket 替代方案:如何使用 Firestore 监听实时事件
Websocket 替代方案:如何使用 Firestore 监听实时事件
|
负载均衡 NoSQL Java
聊聊 分布式 WebSocket 集群解决方案(一)
聊聊 分布式 WebSocket 集群解决方案
聊聊 分布式 WebSocket 集群解决方案(一)
|
存储 JSON 负载均衡
Envoy架构概览(2):HTTP过滤器,HTTP路由,gRPC,WebSocket支持,集群管理器
Envoy架构概览(2):HTTP过滤器,HTTP路由,gRPC,WebSocket支持,集群管理器
|
负载均衡 网络协议 JavaScript
一个注解实现 WebSocket 集群方案,这样玩才爽! 上
一个注解实现 WebSocket 集群方案,这样玩才爽! 上
|
消息中间件 负载均衡 算法
聊聊 分布式 WebSocket 集群解决方案(二)
聊聊 分布式 WebSocket 集群解决方案
聊聊 分布式 WebSocket 集群解决方案(二)
|
负载均衡 NoSQL 算法
聊聊 分布式 WebSocket 集群解决方案(一)
聊聊 分布式 WebSocket 集群解决方案
2743 0
聊聊 分布式 WebSocket 集群解决方案(一)
下一篇
DataWorks