Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题(二)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题(二)

前言



上一篇,简单给大家整合了一下websocket,使用stomp方式。


这篇,就是考虑到单体的服务使用websocket ,按照上一篇的整合,确实没问题。


但是如果一旦是负载多台服务的时候,那么就会出现丢失问题。


什么?没有想过这个问题?


没关系,看图学东西:


一贯作风,我瞎画了一张简图,大致讲一下前后端使用websocket通讯的场面。


image.png


简析:



后端某个服务起了,整合了websocket作为 server,开放了一些节点endpoints ;


前端服务也起了,也整合了websocket作为 client,连接server的websocket ;


后端server 将每个 前端client 连接的 websocket session 都存起来, 确保 知道谁是谁。


这样在server给client推送消息的时候,能保证推送,数据不丢失。


websocket session map  是存在于 后端服务 的内存里面的 ,单一台后端 server,貌似没啥大问题。


ok,我们简单也了解了一下大致的场面,接下来继续看图 ,多台websocket server 负载场景:

image.png简析:


现在作为同一个后端微服务,整合websocket,起了两台。


如上图所示,如果 红色的client 跟 websocket打交道的时候,连接的是上面的 浅蓝色websocket server;


浅蓝色websocket server 推送消息给  红色的client ,通讯没问题。


因为websocket server 的本地服务session map里面存放着 红色client的连接websocket session;


消息丢失情况出现:


我们负载了两台websocket server ,如果触发 websocket server 给红色client继续推送通知消息, nginx/网关 根据我们往常的负载均衡配置规则,分发到了 绿色的 websocket server。


此时,绿色的 websocket server 的本地服务session map里面 并没有  红色client的连接websocket session ,所以会导致  通知消息 丢失 。


解决方案:


既然问题出现了,那么我们就解决它,本篇就是介绍怎么通过 整合消息中间件去解决这个消息丢失的问题。


我们采取的方案 是 将整合websocket 的 server服务 (多台) 都整合 redis作为消息中间件;


在websocket  server 推送消息给websocket  client时, 先把消息丢到 redis里面 。


然后所有的websocket  server (不管多少台服务) 都会订阅此主题,获取到需要推送的数据,接下来 再推送给到对应的 destination 节点 (这时候只有真正与当前client有连接关系的 server扶服务才能推送成功,其余都没有推送)。


做个简图:image.png也许很多人看到这心里面多多少少有些疑惑, 其实根本原因就是 多台 server 没办法共享 连接session,如果能把session 保持起来共享,岂不是解决了?


是的,思路是对的,可惜的是 websocekt session 是没有实现序列化接口的,无法使用类似redis去存储起来,然后反序列化获取。(但是其实可以通过redis存储相关websocket sessionkey 与节点的IP地址、端口,强行把请求再次分发到正确的websocket server上面去。但是个人感觉这种方式不是很好,所以本文还是介绍redis的订阅/推送方式来解决这个问题)


话不多说,进入代码环节。


正文



基于上一篇的基础,开始魔改把redis加入进来。


image.png


贴代码:


pom.xml :


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


application.yml :


server:
  port: 9908
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456


RedisConfig.java :

 

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.stomp.stomptest.listener.RedisListener;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
 * @Author JCccc
 * @Description   redis配置
 * @Date 2021/6/30 8:53
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    @Primary
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);
        template.setValueSerializer(jacksonSeial);
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();
        return template;
    }
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter topicAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了主题 webSocketMsgPush
        container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush"));
        return container;
    }
    /**
     * 消息监听器适配器,绑定消息处理器
     *
     * @return
     */
    @Bean
    MessageListenerAdapter topicAdapter() {
        return new MessageListenerAdapter(new RedisListener());
    }
}


简析:


image.png


WebSocketConfig.java :


import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/**
 * @Author JCccc
 * @Description   EnableWebSocketMessageBroker-注解开启STOMP协议来传输基于代理的消息,此时控制器支持使用@MessageMapping
 * @Date 2021/6/30 8:53
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private static long HEART_BEAT = 10000;
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        config.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT, HEART_BEAT}).setTaskScheduler(te);
    }
    /**
     * 开放节点
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册两个STOMP的endpoint,分别用于广播和点对点
        //广播
        registry.addEndpoint("/publicServer").withSockJS();
        //点对点
        registry.addEndpoint("/privateServer").withSockJS();
    }
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(500 * 1024 * 1024);
        registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
        registration.setSendTimeLimit(200000);
    }
}


InjectServiceUtil.java :


import com.stomp.stomptest.producer.PushMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * @Author JCccc
 * @Description   pushMessage (单例)
 * @Date 2021/6/30 8:53
 */
@Component
public class InjectServiceUtil {
    @Autowired
    private PushMessage pushMessage;
    @PostConstruct
    public void init(){
        InjectServiceUtil.getInstance().pushMessage = this.pushMessage;
    }
    /**
     *  实现单例 start
     */
    private static class SingletonHolder {
        private static final InjectServiceUtil INSTANCE = new InjectServiceUtil();
    }
    private InjectServiceUtil (){}
    public static final InjectServiceUtil getInstance() {
        return SingletonHolder.INSTANCE;
    }
    /**
     *  实现单例 end
     */
    public PushMessage pushMessage(){
        return InjectServiceUtil.getInstance().pushMessage;
    }
}


RedisListener.java :


import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
/**
 * @Author JCccc
 * @Description   redis监听消息
 * @Date 2021/6/30 8:53
 */
public class RedisListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("步骤1.监听到需要进行负载转发的消息:" + message.toString());
        InjectServiceUtil.getInstance().pushMessage().send(message.toString());
    }
}


简析:


image.png


Message.java :


/**
 * @Author JCccc
 * @Description
 * @Date 2021/8/20 9:26
 */
public class Message {
    /**
     * 消息编码
     */
    private String code;
    /**
     * 来自(保证唯一)
     */
    private String form;
    /**
     * 去自(保证唯一)
     */
    private String to;
    /**
     * 内容
     */
    private String content;
    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }
    public String getForm() {
        return form;
    }
    public void setForm(String form) {
        this.form = form;
    }
    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
}


PushMessage.java  :


import com.alibaba.fastjson.JSON;
import com.stomp.stomptest.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
/**
 * @Author JCccc
 * @Description   消息发送
 * @Date 2021/6/30 8:53
 */
@Service
public class PushMessage {
    @Autowired
    private SimpMessagingTemplate template;
    public void send(String msgJson){
        System.out.println("步骤2.即将推送给websocket server:");
        Message message = JSON.parseObject(msgJson, Message.class);
        System.out.println("步骤2.1 消息发给:"+message.getTo());
        System.out.println("步骤2.2 发送内容是:"+message.getContent());
        template.convertAndSendToUser(message.getTo(), "/message", message.getContent());
        System.out.println("----------消息发送完毕----------");
        //广播
       // template.convertAndSend("/topic/public",chatMessage);
    }
}


最后是 TestController.java (测试接口。模拟触发系统推送消息):


@Controller
public class TestController {
    @Autowired
    public SimpMessagingTemplate template;
    @ResponseBody
    @RequestMapping("/pushToOneTest")
    public String sendMessage(@RequestBody Message message) {
        String messageJson = JSON.toJSONString(message);
        System.out.println("!!!系统准备做消息推送!!!");
        stringRedisTemplate.convertAndSend("webSocketMsgPush",messageJson);
        return "发送成功";
    }
}


ok,项目跑起来,调用一下测试接口,看看整个流程:


可以看到消息正常推送:


image.png


看下代码里面打印的信息(大家初学的可以之间打debug看下流程会更好):


image.png


可以看到,这种场景,不管负载了多少台, 消息都先到 redis里面去。


每一台server都通过redisListener 监听主题,获取到相关 消息 , 然后才开始使用本地的websocket session 去找到各自服务器内是否存在当前用户的连接数据,然后推送出去。


那些找不到的无法进行推送出去,虽然黯然失色罢了,但是 我们的负载问题解决了。


恭喜我们自己。


ok,该篇就到这。


最后还有就是之前的一篇,使用rabbitmq作为消息代理的:


《Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程》

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
29天前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
38 1
场景题:百万数据插入Redis有哪些实现方案?
|
26天前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
251 1
|
8天前
|
NoSQL Java API
springboot项目Redis统计在线用户
通过本文的介绍,您可以在Spring Boot项目中使用Redis实现在线用户统计。通过合理配置Redis和实现用户登录、注销及统计逻辑,您可以高效地管理在线用户。希望本文的详细解释和代码示例能帮助您在实际项目中成功应用这一技术。
18 3
|
10天前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
26 1
|
30天前
|
前端开发 Java C++
RSocket vs WebSocket:Spring Boot 3.3 中的两大实时通信利器
本文介绍了在 Spring Boot 3.3 中使用 RSocket 和 WebSocket 实现实时通信的方法。RSocket 是一种高效的网络通信协议,支持多种通信模式,适用于微服务和流式数据传输。WebSocket 则是一种标准协议,支持全双工通信,适合实时数据更新场景。文章通过一个完整的示例,展示了如何配置项目、实现前后端交互和消息传递,并提供了详细的代码示例。通过这些技术,可以大幅提升系统的响应速度和处理效率。
|
29天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
69 2
|
1月前
|
Java API Spring
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中拦截器的入门教程和实战项目场景实现的详细指南。
26 0
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
|
1月前
|
Java API Spring
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中过滤器的基础知识和实战项目应用的教程。
24 0
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
|
1月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
1月前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
30 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。