WebSocket集群分布式改造——实现多人在线聊天室

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文内容摘要:为何要改造为分布式集群如何改造为分布式集群用户在聊天室集群如何发消息用户在聊天室集群如何接收消息补充知识点:STOMP 简介功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布功能二:集群集群用户上下线通知——Redis订阅发布功能三:集群用户信息维护——Redis集合WebSocket集群还有哪些可能性


前言



本文内容摘要:

  • 为何要改造为分布式集群
  • 如何改造为分布式集群
  • 用户在聊天室集群如何发消息
  • 用户在聊天室集群如何接收消息
  • 补充知识点:STOMP 简介
  • 功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布
  • 功能二:集群集群用户上下线通知——Redis订阅发布
  • 功能三:集群用户信息维护——Redis集合
  • WebSocket集群还有哪些可能性


正文



WebSocket集群/分布式改造:实现多人在线聊天室


为何要改造为分布式集群

分布式就是为了解决单点故障问题,想象一下,如果一个服务器承载了1000个大佬同时聊天,服务器突然挂了,1000个大佬瞬间全部掉线,大概明天你就被大佬们吊起来打了。

当聊天室改为集群后,就算服务器A挂了,服务器B上聊天的大佬们还可以愉快的聊天,并且在前端还能通过代码,让连接A的大佬们快速重连至存活的服务器B,继续和大家愉快的聊天,岂不美哉!

总结一下:实现了分布式WebSocket后,我们可以将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。


如何改造为分布式集群

当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe的中间件。我们现在使用Redis作为我们的解决方案。


1. 用户在聊天室集群如何发消息

假设我们的聊天室集群有服务器A和B,用户Alice连接在A上,Bob连接在B上、

Alice向聊天室的服务器A发送消息,A服务器必须要将收到的消息转发到Redis,才能保证聊天室集群的所有服务器(也就是A和B)能够拿到消息。否则,只有Alice在的服务器A能够读到消息,用户Bob在的服务器B并不能收到消息,A和B也就无法聊天了。


2. 用户在聊天室集群如何接收消息

说完了发送消息,那么如何保证Alice发的消息,其他所有人都能收到呢,前面我们知道了Alice发送的消息已经被传到了Redis的频道,那么所有服务器都必须订阅这个Redis频道,然后把这个频道的消息转发到自己的用户那里,这样自己服务器所管辖的用户就能收到消息。


补充知识点:STOMP 简介

上期我们搭建了个websocket聊天室demo,并且使用了STOMP协议,但是我并没有介绍到底什么是STOMP协议,同学们会有疑惑,这里对于STOMP有很好地总结:

当直接使用WebSocket时(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。

就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。

与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20
{"message":"Marco!"}
复制代码


功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布


1. 添加Redis依赖pom

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码


2. application.properties新增redis配置

当然首先要确保你安装了Redis,windows下安装redis比较麻烦,你可以搜索redis-for-windows下载安装。

# redis 连接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
# 空闲连接最大数
spring.redis.jedis.pool.max-idle=10
# 获取连接最大等待时间(s)
spring.redis.jedis.pool.max-wait=60000
复制代码


3. 在application.properties添加频道名定义

# Redis定义
redis.channel.msgToAll = websocket.msgToAll
复制代码


4. 新建redis/RedisListenerBean

package cn.monitor4all.springbootwebsocketdemo.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.net.Inet4Address;
import java.net.InetAddress;
/**
 * Redis订阅频道属性类
 * @author yangzhendong01
 */
@Component
public class RedisListenerBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);
    @Value("${server.port}")
    private String serverPort;
    @Value("${redis.channel.msgToAll}")
    private String msgToAll;
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 监听msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
        LOGGER.info("Subscribed Redis channel: " + msgToAll);
        return container;
    }
}
复制代码


5. 聊天室集群:发消息改造

我们单机聊天室的发送消息Controller是这样的:

@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;
复制代码

我们前端发给我们消息后,直接给/topic/public转发这个消息,让其他用户收到。

在集群中,我们需要把消息转发给Redis,并且不转发给前端,而是让服务端监听Redis消息,在进行消息发送。

将Controller改为:

@Value("${redis.channel.msgToAll}")
private String msgToAll;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@MessageMapping("/chat.sendMessage")
    public void sendMessage(@Payload ChatMessage chatMessage) {
        try {
            redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
复制代码

你会发现我们在代码中使用了JsonUtil将实体类ChatMessage转为了Json发送给了Redis,这个Json工具类需要使用到FaskJson依赖:

  1. pom添加FastJson依赖
<!-- json -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.58</version>
</dependency>
复制代码
  1. 添加Json解析工具类JsonUtil,提供对象转Json,Json转对象的能力
package cn.monitor4all.springbootwebsocketdemo.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * JSON 转换
 */
public final class JsonUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);
    /**
     * 把Java对象转换成json字符串
     *
     * @param object 待转化为JSON字符串的Java对象
     * @return json 串 or null
     */
    public static String parseObjToJson(Object object) {
        String string = null;
        try {
            string = JSONObject.toJSONString(object);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return string;
    }
    /**
     * 将Json字符串信息转换成对应的Java对象
     *
     * @param json json字符串对象
     * @param c    对应的类型
     */
    public static <T> T parseJsonToObj(String json, Class<T> c) {
        try {
            JSONObject jsonObject = JSON.parseObject(json);
            return JSON.toJavaObject(jsonObject, c);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return null;
    }
}
复制代码


6. 聊天室集群:接收消息改造

单机的聊天室,我们接收消息是通过Controller直接把消息转发到所有人的频道上,这样就能在所有人的聊天框显示。

在集群中,我们需要服务器把消息从Redis中拿出来,并且推送到自己管的用户那边,我们在Service层实现消息的推送。

  • 在处理消息之后发送消息:正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。 如果 @MessageMapping注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地; 如果 @SubscribeMapping注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
  • 在应用的任意地方发送消息:spring-websocket 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以实现自由的向任意目的地发送消息,并且订阅此目的地的所有用户都能收到消息。

我们在service实现发送,需要使用上述第二种方法。

新建类service/ChatService:

package cn.monitor4all.springbootwebsocketdemo.service;
import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
@Service
public class ChatService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);
    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;
    public void sendMsg(@Payload ChatMessage chatMessage) {
        LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }
}
复制代码

我们在哪里调用这个service呢,我们需要在监听到消息后调用,所以我们就要有下面的Redis监听消息处理专用类

新建类redis/RedisListenerHandle:

package cn.monitor4all.springbootwebsocketdemo.redis;
import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * Redis订阅频道处理类
 * @author yangzhendong01
 */
@Component
public class RedisListenerHandle extends MessageListenerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);
    @Value("${redis.channel.msgToAll}")
    private String msgToAll;
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Autowired
    private ChatService chatService;
    /**
     * 收到监听消息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String rawMsg;
        String topic;
        try {
            rawMsg = redisTemplate.getStringSerializer().deserialize(body);
            topic = redisTemplate.getStringSerializer().deserialize(channel);
            LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return;
        }
        if (msgToAll.equals(topic)) {
            LOGGER.info("Send message to all users:" + rawMsg);
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            // 发送消息给所有在线Cid
            chatService.sendMsg(chatMessage);
        } else {
            LOGGER.warn("No further operation with this topic!");
        }
    }
}
复制代码


7. 看看效果

这样,我们的改造就基本完成了!我们看一下效果

我们将服务器运行在8080上,然后打开localhost:8080,起名Alice进入聊天室

随后,我们在application.properties中将端口server.port=8081

再次运行程序(别忘了开启IDEA的“允许启动多个并行服务”设置,不然会覆盖掉你的8080服务,如下图),在8081启动一个聊天室,起名Bob进入聊天室。

如下两图,我们已经可以在不同端口的两个聊天室,互相聊天了!(注意看url)

在互相发送消息是,我们还可以使用命令行监听下Redis的频道websocket.msgToAll,可以看到双方传送的消息。如下图:

我们还可以打开Chrome的F12控制台,查看前端的控制台发送消息的log,如下图:

大功告成了吗?

功能实现了,但是并不完美!你会发现,Bob的加入并没有提醒Bob进入了聊天室(在单机版是有的),这是因为我们在“加入聊天室”的代码还没有修改,在加入时,只有Bob的服务器B里的其他用户知道Bob加入了聊天室。我们还能再进一步!


功能二/功能三:集群用户上下线通知,集群用户信息存储

我们需要弥补上面的不足,将用户上线下线的广播发送到所有服务器上。

此外,我还希望以后能够查询集群中所有的在线用户,我们在redis中添加一个set,来保存用户名,这样就可以随时得到在线用户的数量和名称。


1. 在application.properties添加频道名定义

# Redis定义
redis.channel.userStatus = websocket.userStatus
redis.set.onlineUsers = websocket.onlineUsers
复制代码

我们增加两个定义

  • 第一个是新增redis频道websocket.userStatus用来广播用户上下线消息
  • 第二个是redis的set,用来保存在线用户信息


2. 在RedisListenerBean添加新频道监听

container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));
复制代码


3. 在ChatService中添加

public void alertUserStatus(@Payload ChatMessage chatMessage) {
        LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }
复制代码

在service中我们向本服务器的用户广播消息,用户上线或者下线的消息都通过这里传达。


4. 修改ChatController中的addUser方法

@MessageMapping("/chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
        LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
        try {
            headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
            redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
            redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
复制代码

我们修改了addUser方法,在这里往redis中广播用户上线的消息,并把用户名username写入redis的set中(websocket.onlineUsers)


5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

@EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        String username = (String) headerAccessor.getSessionAttributes().get("username");
        if(username != null) {
            LOGGER.info("User Disconnected : " + username);
            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);
            try {
                redisTemplate.opsForSet().remove(onlineUsers, username);
                redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    }
复制代码

在用户关闭网页时,websocket会调用该方法,我们在这里需要把用户从redis的在线用户set里删除,并且向集群发送广播,说明该用户退出聊天室。


6. 修改Redis监听类RedisListenerHandle

else if (userStatus.equals(topic)) {
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            if (chatMessage != null) {
                chatService.alertUserStatus(chatMessage);
            }
复制代码

在监听类中我们接受了来自userStatus频道的消息,并调用service


7. 看看效果

此外,我们还可以在Reids中查询到用户信息:


WebSocket集群还有哪些可能性

有了这两篇文章的基础, 我们当然还能实现以下的功能:

  • 某用户A单独私信给某用户B,或者私信给某用户群(用户B和C)
  • 系统提供外部调用接口,给指定用户/用户群发送消息,实现消息推送
  • 系统提供外部接口,实时获取用户数据(人数/用户信息)


参考文献



深入浅出Websocket(二)分布式Websocket集群

juejin.cn/post/684490…

Spring消息之STOMP:

www.cnblogs.com/jmcui/p/899…


总结



我们在本文中把单机版的聊天室改为了分布式聊天室,大大提高了聊天室可用性。

本文工程源代码:

单机版:

github.com/qqxx6661/sp…

集群版:

github.com/qqxx6661/sp…

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
20天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
2月前
|
存储 分布式计算 负载均衡
分布式计算模型和集群计算模型的区别
【10月更文挑战第18天】分布式计算模型和集群计算模型各有特点和优势,在实际应用中需要根据具体的需求和条件选择合适的计算架构模式,以达到最佳的计算效果和性能。
89 2
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
67 1
|
3月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
50 1
|
3月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
66 1
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
69 1
|
2月前
|
存储 监控 大数据
构建高可用性ClickHouse集群:从单节点到分布式
【10月更文挑战第26天】随着业务的不断增长,单一的数据存储解决方案可能无法满足日益增加的数据处理需求。在大数据时代,数据库的性能、可扩展性和稳定性成为企业关注的重点。ClickHouse 是一个用于联机分析处理(OLAP)的列式数据库管理系统(DBMS),以其卓越的查询性能和高吞吐量而闻名。本文将从我的个人角度出发,分享如何将单节点 ClickHouse 扩展为高可用性的分布式集群,以提升系统的稳定性和可靠性。
191 0
|
3月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
59 0
|
4月前
|
存储 Kubernetes 数据安全/隐私保护
k8s对接ceph集群的分布式文件系统CephFS
文章介绍了如何在Kubernetes集群中使用CephFS作为持久化存储,包括通过secretFile和secretRef两种方式进行认证和配置。
145 5
|
4月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
87 0