问题域
假设我们有这么一个业务场景,在网站下单支付以后,需要通知库存服务进行发货处理。
上面业务实现不难,我们只要让库存服务提供给相关的给口,下单支付之后只要调用库存服务即可。
后面如果又有新的业务,比如说积分服务,他需要获取下单支付的结果,然后增加用户的积分。
这个实现也不难,让积分服务同样提供一个接口,下单支付之后只要调用库存服务即可
如果就两个业务需要获取下单支付的结果,那也还好,程序改造也快。可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。
这时我们会发现上面这样的系统架构存在很多问题:
第一,下单支付业务与其他业务重度耦合,每当有个新业务需要支付结果,就需要改动下单支付的业务。
第二,如果调用业务过多,会导致下单支付接口响应时间变长。另外,如果有任一下游接口响应变慢,就会同步导致下单支付接口响 应也变长。
第三,如果任一下游接口失败,可能导致数据不一致的情况。比如说下图,先调用 A,成功之后再调用 B,最后再调用 C。
如果在调用 B 接口的发生异常,此时可能就导致下单支付接口返回失败,但是此时 A 接口其实已经调用成功,这就代表它内部已经处理下单支付成功的结果。
这样就会导致 A,B,C 三个下游接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情况。
其实我们仔细想一下,对于下单支付业务来讲,它其实不需要关心下游调用结果,只要有某种机制通知能通知到他们就可以了。
讲到这里,这就需要引入今天需要介绍发布订阅机制。
Redis 发布与订阅
Redis 提供了基于「发布/订阅」模式的消息机制,在这种模式下,消息发布者与订阅者不需要进行直接通信。
如上图所示,消息发布者只需要想指定的频道发布消息,订阅该频道的每个客户端都可以接受到到这个消息。
使用 Redis 发布订阅这种机制,对于上面业务,下单支付业务只需要向「支付结果」这个频道发送消息,其他下游业务订阅「支付结果」这个频道,就能收相应消息,然后做出业务处理即可。
这样就可以解耦系统上下游之间调用关系。
命令使用方式
Redis 中提供了一组命令,可以用于发布消息,订阅频道,取消订阅以及按照模式订阅。
首先我们来看下如何发布一条消息,其实很简单只要使用 「publish」 指令
127.0.0.1:6379> publish pay_result "i love redis" (integer) 0
上图中,我们使用 「publish」 指令向 「pay_result」 这个频道发送了一条消息。我们可以看到 redis 向我们返回 0 ,这其实代表当前订阅者个数,由于此时没有订阅,所以返回结果为 0
接下来我们使用 「subscribe」 订阅一个或多个频道
127.0.0.1:6379> subscribe pay_result Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "pay_result" 3) (integer) 1
如上图所示,我们订阅 「pay_result」 这个频道,当有其他客户端往这个频道发送消息,
127.0.0.1:6379> publish pay_result "i love redis" (integer) 1
当前订阅者就会收到消息。
1) "message" 2) "pay_result" 3) "i love redis"
了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 * 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。
使用 Redis 订阅模式,我们需要使用一个新的指令 「psubscribe」。
我们执行下面这个指令:
psubscribe pay.*
如果需要取消订阅模式,我们需要使用相应punsubscribe 指令,比如取消上面订阅的模式:
punsubscribe pay.*
Redis存储数据更改key和value的序列化方式
RedisConfig.java
@Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 //redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setValueSerializer(new StringRedisSerializer()); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; }
SpringBoot集成redis实现消息发布订阅模式
- 依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
- 配置redis以及连接池
server: port: 8081 spring: redis: host: localhost port: 6379 password: success database: 1 pool: max-active: -1 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 8 # 连接池中的最大空闲连接 min-idle: 0 # 连接池中的最小空闲连接
- 消息处理器POJO
package tech.it.msg; import org.springframework.stereotype.Component; @Component public class MsgReceiver { /** * 接收消息方法 */ public void receiverMessage(String message) { System.out.println("MessageReceiver收到一条新消息:" + message); } }
- 设置消息发布者、消息处理者POJO、redis消息监听容器以及redis监听器注入IOC容器
package tech.it.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import tech.it.msg.MsgReceiver; @Configuration public class RedisConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat的通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat")); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapter(MsgReceiver receiver) { //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //不填defaultListenerMethod默认调用handleMessage return new MessageListenerAdapter(receiver, "receiverMessage"); } /** * 读取内容的template */ @Bean public StringRedisTemplate template(RedisConnectionFactory connectionFactory) { System.out.println("template..."); return new StringRedisTemplate(connectionFactory); } }
- 消息发布者
package tech.it.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private RedisTemplate<String,String> redisTemplate; @GetMapping("/pro01") public String pro01(String data){ redisTemplate.convertAndSend("chat",data); return "success"; } }
配置多个监听通道
①一个监听器订阅多个通道
- 修改监听器 - 增加一个chat2的通道配置
@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat的通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat")); //配置多个监听通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat2")); return container; }
- 新增一个消息的发布者
/** * 第二个发布者 * @param data * @return */ @GetMapping("/pro02") public String pro02(String data){ redisTemplate.convertAndSend("chat2",data); return "success"; }
②配置多个监听器监听不同的通道
- 注入一个新的bean,名字要和之前不一样,调用 MessageReceiver 的 receiverMessage2 方法。
@Bean public MessageListenerAdapter listenerAdapter2(MsgReceiver receiver) { //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //不填defaultListenerMethod默认调用handleMessage return new MessageListenerAdapter(receiver, "receiverMessage2"); }
这里也可以自己新注入一个新的 Receiver 叫 MessageReceiver2
- 配置监听容器,这里参数命名要和上边bean注入的方法名一致
@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat的通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat")); //配置多个监听通道 container.addMessageListener(listenerAdapter2, new PatternTopic("chat2")); return container; }
- 添加一个新的消息发布者
/** * 第二个发布者 * @param data * @return */ @GetMapping("/pro02") public String pro02(String data){ redisTemplate.convertAndSend("chat2",data); return "success"; }
Springboot集成websocket,实现服务端推送消息到客户端
现在很多web网站上都有站内消息通知,用于给用户及时推送站内信消息。大多数是在网页头部导航栏上带一个小铃铛图标,有新的消息时,铃铛会出现相应提示
我们都知道,web应用都是C/S模式,客户端通过浏览器发出一个请求,服务器端接收请求后进行处理并返回结果给客户端,客户端浏览器将信息呈现给用户。所以很容易想到的一种解决方式就是:
Ajax轮询:客户端使用js写一个定时器setInterval(),以固定的时间间隔向服务器发起请求,查询是否有最新消息。基于 Flash:AdobeFlash 通过自己的 Socket 实现数据交换,再利用 Flash 暴露出对应的接口给 js调用,从而实现实时传输,此方式比Ajax轮询要高效。但在移动互联网终端上对Flash 的支持并不好。现在已经基本不再使用。
而对于Ajax轮询方案,优点是实现起来简单,适用于对消息实时性要求不高,用户量小的场景下,缺点就是客户端给服务器带来很多无谓请求,浪费带宽,效率低下,做不到服务端的主动推送
websoket出现
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 WebSocket 使客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建长连接,并进行双向数据传输。
总结
每项技术带来优点的同时,同时也会附带缺点,目前来看websocket的一些小问题:
- websocket链接断开后,不会主动重连,需要手动刷新网页或者自己实现断线重连机制
- 低版本浏览器对websocket支持不太好,如IE8
- 服务端持有了一个所有websocket对象的集合Map,用户在线量大的时候,占用内存大,当然这个可以优化代码
- websocket受网络波动影响较大,因为是长连接,网络差劲时,长连接会受影响
所以,具体看实际场景需求,选择合适方案。
websocket搭建步骤
- 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
- websoket配置类
@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
- websoket端点配置
package tech.it.model; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @ServerEndpoint(value = "/testWebSocket/{id}") public class WebSocketProcess { /* * 持有每个webSocket对象,以key-value存储到线程安全ConcurrentHashMap, */ private static ConcurrentHashMap<Long, WebSocketProcess> concurrentHashMap = new ConcurrentHashMap<>(12); /** * 会话对象 **/ private Session session; /* * 客户端创建连接时触发 * */ @OnOpen public void onOpen(Session session, @PathParam("id") long id) { //每新建立一个连接,就把当前客户id为key,this为value存储到map中 this.session = session; concurrentHashMap.put(id, this); log.info("Open a websocket. id={}", id); } /** * 客户端连接关闭时触发 **/ @OnClose public void onClose(Session session, @PathParam("id") long id) { //客户端连接关闭时,移除map中存储的键值对 concurrentHashMap.remove(id); log.info("close a websocket, concurrentHashMap remove sessionId= {}", id); } /** * 接收到客户端消息时触发 */ @OnMessage public void onMessage(String message, @PathParam("id") String id) { log.info("receive a message from client id={},msg={}", id, message); } /** * 连接发生异常时候触发 */ @OnError public void onError(Session session, Throwable error) { log.error("Error while websocket. ", error); } /** * 发送消息到指定客户端 * @param id * @param message * */ public void sendMessage(long id, String message) throws Exception { //根据id,从map中获取存储的webSocket对象 WebSocketProcess webSocketProcess = concurrentHashMap.get(id); if (!ObjectUtils.isEmpty(webSocketProcess)) { //当客户端是Open状态时,才能发送消息 if (webSocketProcess.session.isOpen()) { webSocketProcess.session.getBasicRemote().sendText(message); } else { log.error("websocket session={} is closed ", id); } } else { log.error("websocket session={} is not exit ", id); } } /** * 发送消息到所有客户端 * * */ public void sendAllMessage(String msg) throws Exception { log.info("online client count={}", concurrentHashMap.size()); Set<Map.Entry<Long, WebSocketProcess>> entries = concurrentHashMap.entrySet(); for (Map.Entry<Long, WebSocketProcess> entry : entries) { Long cid = entry.getKey(); WebSocketProcess webSocketProcess = entry.getValue(); boolean sessionOpen = webSocketProcess.session.isOpen(); if (sessionOpen) { webSocketProcess.session.getBasicRemote().sendText(msg); } else { log.info("cid={} is closed,ignore send text", cid); } } }
- 控制器
package tech.it.controller;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import tech.it.model.WebSocketProcess;
@RestController@RequestMapping("/ws")public class WebSocketController { /** *注入WebSocketProcess **/ @Autowired private WebSocketProcess webSocketProcess;
@PostMapping(value = "sendMsgToClientById") public void sendMsgToClientById(@RequestParam long id, @RequestParam String text){ try { webSocketProcess.sendMessage(id,text); } catch (Exception e) { e.printStackTrace(); } } /** * 发消息到所有客户端 */ @PostMapping(value = "sendMsgToAllClient")
public void sendMsgToAllClient( @RequestParam String text){ try { webSocketProcess.sendAllMessage(text); } catch (Exception e) { e.printStackTrace(); } }}
5. html页面 ~~~html <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>websocket测试</title> <script src="http://code.jquery.com/jquery-2.1.1.min.js"></script> </head> <body> <div id="content"></div> </body> <script type="text/javascript"> $(function(){ var ws; //检测浏览器是否支持webSocket if("WebSocket" in window){ $("#content").html("您的浏览器支持webSocket!"); //模拟产生clientID let clientID = Math.ceil(Math.random()*100); //创建 WebSocket 对象,注意请求路径!!!! ws = new WebSocket("ws://127.0.0.1:8081/testWebSocket/"+clientID); //与服务端建立连接时触发 ws.onopen = function(){ $("#content").append("<p>与服务端建立连接建立成功!您的客户端ID="+clientID+"</p>"); //模拟发送数据到服务器 ws.send("你好服务端!我是客户端 "+clientID); } //接收到服务端消息时触发 ws.onmessage = function (evt) { let received_msg = evt.data; $("#content").append("<p>接收到服务端消息:"+received_msg+"</p>"); }; //服务端关闭连接时触发 ws.onclose = function() { console.error("连接已经关闭.....") }; }else{ $("#content").html("您的浏览器不支持webSocket!"); } }) </script> </html>
6. 打开三个窗口,分别用postman来进行私发和群发的测试.
websoket在线测试
http://www.easyswoole.com/wstool.html
稍微修改一下
/** * 接收到客户端消息时触发 */ @OnMessage public void onMessage(String message, @PathParam("id") String id) { try { //此处增加调用所有客户端的方法 sendAllMessage(message); } catch (Exception e) { e.printStackTrace(); } log.info("receive a message from client id={},msg={}", id, message); }
基于Redis发布订阅的分布式WebSocket通信
不同的server是拥有不同的session,不同的server的session是不共享的.session发送方法不能发送其他server的.
webSocketProcess.session.getBasicRemote().sendText(msg);
解决方案流程图:
8081 -> 群发了一个消息,希望在其他服务器上正在连接的客户端也能看到这个消息.
当8081某个用户群发的消息放入到redis的订阅队列中[通道]
四种web通讯方式
<button type="button" onclick="test()">get请求</button> <script> function test(){ //原生的ajax代码. //1. 创建XMLHttpRequest对象 let xmlHttpRequest = new XMLHttpRequest(); //设置连接信息 xmlHttpRequest.open("GET", "/hello/100",true); //发送 xmlHttpRequest.send(); //回调函数 - readyState发生改变的时候就会调用. //0 1 2 3 4 xmlHttpRequest.onreadystatechange=function(){ if(xmlHttpRequest.readyState == 4 && xmlHttpRequest.status == "200"){ //处理后端返回过来的json字符串 let result = xmlHttpRequest.responseText; //将json字符串转成json对象 let data = JSON.parse(result); console.log(data); console.log(data.code); } } } </script>
- 短轮询
http端轮询是服务器收到请求不管是否有数据都直接响应 http 请求;其实就是普通的轮询。指在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。
传统的web通信模式。后台处理数据,需要一定时间,前端想要知道后端的处理结果,就要不定时的向后端发出请求以获得最新情况。
- 请求中有大半是无用,难于维护,浪费带宽和服务器资源;
- 响应的结果没有顺序(因为是异步请求,当发送的请求没有返回结果的时候,后面的请求又被发送。而此时如果后面的请求比前面的请 求要先返回结果,那么当前面的请求返回结果数据时已经是过时无效的数据了)。
var xhr = new XMLHttpRequest(); setInterval(function(){ xhr.open('GET','/user'); xhr.onreadystatechange = function(){ }; xhr.send(); },1000)
- 长轮询首先由客户端向服务器发起请求,当服务器收到客户端发来的请求后,服务器端不会直接进行响应,而是先将 这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制才返回。 客户端 JavaScript 响应处理函数会在处理完服务器返回的信息后,再次发出请求,重新建立连接。长轮询和短轮询比起来,它的 优点是明显减少了很多不必要的 http 请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。
- 短连接在HTTP 0.9和HTTP 1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。每次Http请求都会建立Tcp连接
- 长连接HTTP1.0已经开始支持了.但是默认还是短连接.HTTP1.1默认支持长连接只需要建立一次Tcp连接,以后Http请求重复使用同一个Tcp连接
- websocket在HTTP协议中通信只能由客户端发起,服务器不能主动向客户端推送消息当建立起WebSocket的长连接,双方都可以互相发送信息,服务端可以主动发起信息。WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建长连接,并进行双向数据传输。
function ajax(){ var xhr = new XMLHttpRequest(); xhr.open('GET','/user'); xhr.onreadystatechange = function(){ ajax(); }; xhr.send(); } ajax();