前言
上一篇,简单给大家整合了一下websocket,使用stomp方式。
这篇,就是考虑到单体的服务使用websocket ,按照上一篇的整合,确实没问题。
但是如果一旦是负载多台服务的时候,那么就会出现丢失问题。
什么?没有想过这个问题?
没关系,看图学东西:
一贯作风,我瞎画了一张简图,大致讲一下前后端使用websocket通讯的场面。
简析:
后端某个服务起了,整合了websocket作为 server,开放了一些节点endpoints ;
前端服务也起了,也整合了websocket作为 client,连接server的websocket ;
后端server 将每个 前端client 连接的 websocket session 都存起来, 确保 知道谁是谁。
这样在server给client推送消息的时候,能保证推送,数据不丢失。
websocket session map 是存在于 后端服务 的内存里面的 ,单一台后端 server,貌似没啥大问题。
ok,我们简单也了解了一下大致的场面,接下来继续看图 ,多台websocket server 负载场景:
简析:
现在作为同一个后端微服务,整合websocket,起了两台。
如上图所示,如果 红色的client 跟 websocket打交道的时候,连接的是上面的 浅蓝色websocket server;
浅蓝色websocket server 推送消息给 红色的client ,通讯没问题。
因为websocket server 的本地服务session map里面存放着 红色client的连接websocket session;
消息丢失情况出现:
我们负载了两台websocket server ,如果触发 websocket server 给红色client继续推送通知消息, nginx/网关 根据我们往常的负载均衡配置规则,分发到了 绿色的 websocket server。
此时,绿色的 websocket server 的本地服务session map里面 并没有 红色client的连接websocket session ,所以会导致 通知消息 丢失 。
解决方案:
既然问题出现了,那么我们就解决它,本篇就是介绍怎么通过 整合消息中间件去解决这个消息丢失的问题。
我们采取的方案 是 将整合websocket 的 server服务 (多台) 都整合 redis作为消息中间件;
在websocket server 推送消息给websocket client时, 先把消息丢到 redis里面 。
然后所有的websocket server (不管多少台服务) 都会订阅此主题,获取到需要推送的数据,接下来 再推送给到对应的 destination 节点 (这时候只有真正与当前client有连接关系的 server扶服务才能推送成功,其余都没有推送)。
做个简图:也许很多人看到这心里面多多少少有些疑惑, 其实根本原因就是 多台 server 没办法共享 连接session,如果能把session 保持起来共享,岂不是解决了?
是的,思路是对的,可惜的是 websocekt session 是没有实现序列化接口的,无法使用类似redis去存储起来,然后反序列化获取。(但是其实可以通过redis存储相关websocket sessionkey 与节点的IP地址、端口,强行把请求再次分发到正确的websocket server上面去。但是个人感觉这种方式不是很好,所以本文还是介绍redis的订阅/推送方式来解决这个问题)
话不多说,进入代码环节。
正文
基于上一篇的基础,开始魔改把redis加入进来。
贴代码:
pom.xml :
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
application.yml :
server: port: 9908 spring: redis: host: 127.0.0.1 port: 6379 password: 123456
RedisConfig.java :
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.stomp.stomptest.listener.RedisListener; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @Author JCccc * @Description redis配置 * @Date 2021/6/30 8:53 */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { @Bean @Primary public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); template.setValueSerializer(jacksonSeial); template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter topicAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了主题 webSocketMsgPush container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush")); return container; } /** * 消息监听器适配器,绑定消息处理器 * * @return */ @Bean MessageListenerAdapter topicAdapter() { return new MessageListenerAdapter(new RedisListener()); } }
简析:
WebSocketConfig.java :
import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; /** * @Author JCccc * @Description EnableWebSocketMessageBroker-注解开启STOMP协议来传输基于代理的消息,此时控制器支持使用@MessageMapping * @Date 2021/6/30 8:53 */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { private static long HEART_BEAT = 10000; @Override public void configureMessageBroker(MessageBrokerRegistry config) { ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler(); te.setPoolSize(1); te.setThreadNamePrefix("wss-heartbeat-thread-"); te.initialize(); config.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT, HEART_BEAT}).setTaskScheduler(te); } /** * 开放节点 * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //注册两个STOMP的endpoint,分别用于广播和点对点 //广播 registry.addEndpoint("/publicServer").withSockJS(); //点对点 registry.addEndpoint("/privateServer").withSockJS(); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.setMessageSizeLimit(500 * 1024 * 1024); registration.setSendBufferSizeLimit(1024 * 1024 * 1024); registration.setSendTimeLimit(200000); } }
InjectServiceUtil.java :
import com.stomp.stomptest.producer.PushMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @Author JCccc * @Description pushMessage (单例) * @Date 2021/6/30 8:53 */ @Component public class InjectServiceUtil { @Autowired private PushMessage pushMessage; @PostConstruct public void init(){ InjectServiceUtil.getInstance().pushMessage = this.pushMessage; } /** * 实现单例 start */ private static class SingletonHolder { private static final InjectServiceUtil INSTANCE = new InjectServiceUtil(); } private InjectServiceUtil (){} public static final InjectServiceUtil getInstance() { return SingletonHolder.INSTANCE; } /** * 实现单例 end */ public PushMessage pushMessage(){ return InjectServiceUtil.getInstance().pushMessage; } }
RedisListener.java :
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; /** * @Author JCccc * @Description redis监听消息 * @Date 2021/6/30 8:53 */ public class RedisListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("步骤1.监听到需要进行负载转发的消息:" + message.toString()); InjectServiceUtil.getInstance().pushMessage().send(message.toString()); } }
简析:
Message.java :
/** * @Author JCccc * @Description * @Date 2021/8/20 9:26 */ public class Message { /** * 消息编码 */ private String code; /** * 来自(保证唯一) */ private String form; /** * 去自(保证唯一) */ private String to; /** * 内容 */ private String content; public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getForm() { return form; } public void setForm(String form) { this.form = form; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
PushMessage.java :
import com.alibaba.fastjson.JSON; import com.stomp.stomptest.pojo.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; /** * @Author JCccc * @Description 消息发送 * @Date 2021/6/30 8:53 */ @Service public class PushMessage { @Autowired private SimpMessagingTemplate template; public void send(String msgJson){ System.out.println("步骤2.即将推送给websocket server:"); Message message = JSON.parseObject(msgJson, Message.class); System.out.println("步骤2.1 消息发给:"+message.getTo()); System.out.println("步骤2.2 发送内容是:"+message.getContent()); template.convertAndSendToUser(message.getTo(), "/message", message.getContent()); System.out.println("----------消息发送完毕----------"); //广播 // template.convertAndSend("/topic/public",chatMessage); } }
最后是 TestController.java (测试接口。模拟触发系统推送消息):
@Controller public class TestController { @Autowired public SimpMessagingTemplate template; @ResponseBody @RequestMapping("/pushToOneTest") public String sendMessage(@RequestBody Message message) { String messageJson = JSON.toJSONString(message); System.out.println("!!!系统准备做消息推送!!!"); stringRedisTemplate.convertAndSend("webSocketMsgPush",messageJson); return "发送成功"; } }
ok,项目跑起来,调用一下测试接口,看看整个流程:
可以看到消息正常推送:
看下代码里面打印的信息(大家初学的可以之间打debug看下流程会更好):
可以看到,这种场景,不管负载了多少台, 消息都先到 redis里面去。
每一台server都通过redisListener 监听主题,获取到相关 消息 , 然后才开始使用本地的websocket session 去找到各自服务器内是否存在当前用户的连接数据,然后推送出去。
那些找不到的无法进行推送出去,虽然黯然失色罢了,但是 我们的负载问题解决了。
恭喜我们自己。
ok,该篇就到这。
最后还有就是之前的一篇,使用rabbitmq作为消息代理的:
《Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程》