Redis发布订阅模式

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis发布订阅模式

问题域

假设我们有这么一个业务场景,在网站下单支付以后,需要通知库存服务进行发货处理。

上面业务实现不难,我们只要让库存服务提供给相关的给口,下单支付之后只要调用库存服务即可。

后面如果又有新的业务,比如说积分服务,他需要获取下单支付的结果,然后增加用户的积分。

这个实现也不难,让积分服务同样提供一个接口,下单支付之后只要调用库存服务即可

如果就两个业务需要获取下单支付的结果,那也还好,程序改造也快。可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。

这时我们会发现上面这样的系统架构存在很多问题:

第一,下单支付业务与其他业务重度耦合,每当有个新业务需要支付结果,就需要改动下单支付的业务。

第二,如果调用业务过多,会导致下单支付接口响应时间变长。另外,如果有任一下游接口响应变慢,就会同步导致下单支付接口响 应也变长。

第三,如果任一下游接口失败,可能导致数据不一致的情况。比如说下图,先调用 A,成功之后再调用 B,最后再调用 C。

如果在调用 B 接口的发生异常,此时可能就导致下单支付接口返回失败,但是此时 A 接口其实已经调用成功,这就代表它内部已经处理下单支付成功的结果。

这样就会导致 A,B,C 三个下游接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情况。

其实我们仔细想一下,对于下单支付业务来讲,它其实不需要关心下游调用结果,只要有某种机制通知能通知到他们就可以了。

讲到这里,这就需要引入今天需要介绍发布订阅机制。

Redis 发布与订阅

Redis 提供了基于「发布/订阅」模式的消息机制,在这种模式下,消息发布者与订阅者不需要进行直接通信。

如上图所示,消息发布者只需要想指定的频道发布消息,订阅该频道的每个客户端都可以接受到到这个消息。

使用 Redis 发布订阅这种机制,对于上面业务,下单支付业务只需要向「支付结果」这个频道发送消息,其他下游业务订阅「支付结果」这个频道,就能收相应消息,然后做出业务处理即可。

这样就可以解耦系统上下游之间调用关系。

命令使用方式

Redis 中提供了一组命令,可以用于发布消息,订阅频道,取消订阅以及按照模式订阅。

首先我们来看下如何发布一条消息,其实很简单只要使用 「publish」 指令

127.0.0.1:6379> publish pay_result "i love redis"
(integer) 0

上图中,我们使用 「publish」 指令向 「pay_result」 这个频道发送了一条消息。我们可以看到 redis 向我们返回 0 ,这其实代表当前订阅者个数,由于此时没有订阅,所以返回结果为 0

接下来我们使用 「subscribe」 订阅一个或多个频道

127.0.0.1:6379> subscribe pay_result
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "pay_result"
3) (integer) 1

如上图所示,我们订阅 「pay_result」 这个频道,当有其他客户端往这个频道发送消息,

127.0.0.1:6379> publish pay_result "i love redis"
(integer) 1

当前订阅者就会收到消息。

1) "message"
2) "pay_result"
3) "i love redis"

了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 * 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。

使用 Redis 订阅模式,我们需要使用一个新的指令 「psubscribe」

我们执行下面这个指令:

psubscribe pay.*

如果需要取消订阅模式,我们需要使用相应punsubscribe 指令,比如取消上面订阅的模式:

punsubscribe pay.*

Redis存储数据更改key和value的序列化方式

RedisConfig.java

@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
  redisTemplate.setConnectionFactory(redisConnectionFactory);
  // 使用Jackson2JsonRedisSerialize 替换默认序列化
  Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
  ObjectMapper objectMapper = new ObjectMapper();
  objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
  // 设置value的序列化规则和 key的序列化规则
  //redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
  redisTemplate.setValueSerializer(new StringRedisSerializer());
  redisTemplate.setKeySerializer(new StringRedisSerializer());
  redisTemplate.afterPropertiesSet();
  return redisTemplate;
}

SpringBoot集成redis实现消息发布订阅模式

  1. 依赖
<dependencies>
   <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
   </dependency>
   <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
  1. 配置redis以及连接池
server:
  port: 8081
spring:
  redis:
    host: localhost
    port: 6379
    password: success
    database: 1
    pool:
      max-active: -1 # 连接池最大连接数(使用负值表示没有限制)
      max-wait: -1  # 连接池最大阻塞等待时间(使用负值表示没有限制)
      max-idle: 8  # 连接池中的最大空闲连接
      min-idle: 0  # 连接池中的最小空闲连接
  1. 消息处理器POJO
package tech.it.msg;
import org.springframework.stereotype.Component;
@Component
public class MsgReceiver {
 /**
        * 接收消息方法
     */
    public void receiverMessage(String message) {
        System.out.println("MessageReceiver收到一条新消息:" + message);
    }
}
  1. 设置消息发布者、消息处理者POJO、redis消息监听容器以及redis监听器注入IOC容器
package tech.it.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import tech.it.msg.MsgReceiver;
@Configuration
public class RedisConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                         MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
     container.setConnectionFactory(connectionFactory);
     //订阅了一个叫chat的通道
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
        return container;
    }
    /**
           * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
           * @param receiver
           * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(MsgReceiver receiver) {
        //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //不填defaultListenerMethod默认调用handleMessage
        return new MessageListenerAdapter(receiver, "receiverMessage");
    }
 /**
        * 读取内容的template
     */
    @Bean
    public StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        System.out.println("template...");
        return new StringRedisTemplate(connectionFactory);
    }
}
  1. 消息发布者
package tech.it.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
    @Autowired
    private RedisTemplate<String,String> redisTemplate;
    @GetMapping("/pro01")
    public String pro01(String data){
        redisTemplate.convertAndSend("chat",data);
        return "success";
    }
}

配置多个监听通道

①一个监听器订阅多个通道

  1. 修改监听器 - 增加一个chat2的通道配置
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                        MessageListenerAdapter listenerAdapter) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  //订阅了一个叫chat的通道
  container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
  //配置多个监听通道
  container.addMessageListener(listenerAdapter, new PatternTopic("chat2"));
  return container;
}
  1. 新增一个消息的发布者
/**
        * 第二个发布者
        * @param data
        * @return
     */
@GetMapping("/pro02")
public String pro02(String data){
  redisTemplate.convertAndSend("chat2",data);
  return "success";
}

②配置多个监听器监听不同的通道

  1. 注入一个新的bean,名字要和之前不一样,调用 MessageReceiver 的 receiverMessage2 方法。
@Bean
public MessageListenerAdapter listenerAdapter2(MsgReceiver receiver) {
  //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
  //不填defaultListenerMethod默认调用handleMessage
  return new MessageListenerAdapter(receiver, "receiverMessage2");
}

这里也可以自己新注入一个新的 ReceiverMessageReceiver2

  1. 配置监听容器,这里参数命名要和上边bean注入的方法名一致
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                        MessageListenerAdapter listenerAdapter,
                                        MessageListenerAdapter listenerAdapter2) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  //订阅了一个叫chat的通道
  container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
  //配置多个监听通道
  container.addMessageListener(listenerAdapter2, new PatternTopic("chat2"));
  return container;
}
  1. 添加一个新的消息发布者
/**
        * 第二个发布者
        * @param data
        * @return
     */
@GetMapping("/pro02")
public String pro02(String data){
  redisTemplate.convertAndSend("chat2",data);
  return "success";
}

Springboot集成websocket,实现服务端推送消息到客户端

现在很多web网站上都有站内消息通知,用于给用户及时推送站内信消息。大多数是在网页头部导航栏上带一个小铃铛图标,有新的消息时,铃铛会出现相应提示

我们都知道,web应用都是C/S模式,客户端通过浏览器发出一个请求,服务器端接收请求后进行处理并返回结果给客户端,客户端浏览器将信息呈现给用户。所以很容易想到的一种解决方式就是:

Ajax轮询:客户端使用js写一个定时器setInterval(),以固定的时间间隔向服务器发起请求,查询是否有最新消息。基于 Flash:AdobeFlash 通过自己的 Socket 实现数据交换,再利用 Flash 暴露出对应的接口给 js调用,从而实现实时传输,此方式比Ajax轮询要高效。但在移动互联网终端上对Flash 的支持并不好。现在已经基本不再使用。

而对于Ajax轮询方案,优点是实现起来简单,适用于对消息实时性要求不高,用户量小的场景下,缺点就是客户端给服务器带来很多无谓请求,浪费带宽,效率低下,做不到服务端的主动推送

websoket出现

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 WebSocket 使客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建长连接,并进行双向数据传输

总结

每项技术带来优点的同时,同时也会附带缺点,目前来看websocket的一些小问题:

  • websocket链接断开后,不会主动重连,需要手动刷新网页或者自己实现断线重连机制
  • 低版本浏览器对websocket支持不太好,如IE8
  • 服务端持有了一个所有websocket对象的集合Map,用户在线量大的时候,占用内存大,当然这个可以优化代码
  • websocket受网络波动影响较大,因为是长连接,网络差劲时,长连接会受影响

所以,具体看实际场景需求,选择合适方案。

websocket搭建步骤

  1. 依赖
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>
  1. websoket配置类
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
  1. websoket端点配置
package tech.it.model;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint(value = "/testWebSocket/{id}")
public class WebSocketProcess {
    /*
        * 持有每个webSocket对象,以key-value存储到线程安全ConcurrentHashMap,
     */
    private static ConcurrentHashMap<Long, WebSocketProcess> concurrentHashMap = new ConcurrentHashMap<>(12);
    /**
        * 会话对象
     **/
    private Session session;
       /*
          * 客户端创建连接时触发
       * */
   @OnOpen
   public void onOpen(Session session, @PathParam("id") long id) {
       //每新建立一个连接,就把当前客户id为key,this为value存储到map中
       this.session = session;
       concurrentHashMap.put(id, this);
       log.info("Open a websocket. id={}", id);
   }
   /**
          * 客户端连接关闭时触发
    **/
   @OnClose
   public void onClose(Session session, @PathParam("id") long id) {
       //客户端连接关闭时,移除map中存储的键值对
       concurrentHashMap.remove(id);
       log.info("close a websocket, concurrentHashMap remove sessionId= {}", id);
   }
   /**
          * 接收到客户端消息时触发
    */
   @OnMessage
   public void onMessage(String message, @PathParam("id") String id) {
       log.info("receive a message from client id={},msg={}", id, message);
   }
   /**
          * 连接发生异常时候触发
    */
   @OnError
   public void onError(Session session, Throwable error) {
       log.error("Error while websocket. ", error);
   }
   /**
          * 发送消息到指定客户端
       *  @param id
       *  @param message
       * */
   public void sendMessage(long id, String message) throws Exception {
       //根据id,从map中获取存储的webSocket对象
       WebSocketProcess webSocketProcess = concurrentHashMap.get(id);
       if (!ObjectUtils.isEmpty(webSocketProcess)) {
           //当客户端是Open状态时,才能发送消息
           if (webSocketProcess.session.isOpen()) {
               webSocketProcess.session.getBasicRemote().sendText(message);
           } else {
                  log.error("websocket session={} is closed ", id);
           }
       } else {
           log.error("websocket session={} is not exit ", id);
       }
   }
   /**
          * 发送消息到所有客户端
    *
       * */
   public void sendAllMessage(String msg) throws Exception {
       log.info("online client count={}", concurrentHashMap.size());
       Set<Map.Entry<Long, WebSocketProcess>> entries = concurrentHashMap.entrySet();
       for (Map.Entry<Long, WebSocketProcess> entry : entries) {
           Long cid = entry.getKey();
           WebSocketProcess webSocketProcess = entry.getValue();
           boolean sessionOpen = webSocketProcess.session.isOpen();
           if (sessionOpen) {
               webSocketProcess.session.getBasicRemote().sendText(msg);
           } else {
               log.info("cid={} is closed,ignore send text", cid);
           }
       }
   }
  1. 控制器

package tech.it.controller;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import tech.it.model.WebSocketProcess;

@RestController@RequestMapping("/ws")public class WebSocketController { /** *注入WebSocketProcess **/ @Autowired private WebSocketProcess webSocketProcess;

@PostMapping(value = "sendMsgToClientById")
public void sendMsgToClientById(@RequestParam long id, @RequestParam String text){
       try {
        webSocketProcess.sendMessage(id,text);
 } catch (Exception e) {
        e.printStackTrace();
    }
}
/**
       * 发消息到所有客户端
    */
@PostMapping(value = "sendMsgToAllClient")

public void sendMsgToAllClient( @RequestParam String text){ try { webSocketProcess.sendAllMessage(text); } catch (Exception e) { e.printStackTrace(); } }}

5. html页面
~~~html
<!DOCTYPE html>
<html>
<head>
 <meta charset="UTF-8">
   <title>websocket测试</title>
    <script src="http://code.jquery.com/jquery-2.1.1.min.js"></script>
</head>
<body>
<div id="content"></div>
</body>
<script type="text/javascript">
    $(function(){
        var ws;
        //检测浏览器是否支持webSocket
        if("WebSocket" in window){
            $("#content").html("您的浏览器支持webSocket!");
            //模拟产生clientID
            let clientID = Math.ceil(Math.random()*100);
            //创建 WebSocket 对象,注意请求路径!!!!
            ws = new WebSocket("ws://127.0.0.1:8081/testWebSocket/"+clientID);
            //与服务端建立连接时触发
         ws.onopen = function(){
                   $("#content").append("<p>与服务端建立连接建立成功!您的客户端ID="+clientID+"</p>");
                //模拟发送数据到服务器
                ws.send("你好服务端!我是客户端 "+clientID);
               }
         //接收到服务端消息时触发
            ws.onmessage = function (evt) {
                   let received_msg = evt.data;
                $("#content").append("<p>接收到服务端消息:"+received_msg+"</p>");
         };
            //服务端关闭连接时触发
            ws.onclose = function() {
                   console.error("连接已经关闭.....")
            };
     }else{
            $("#content").html("您的浏览器不支持webSocket!");
        }
    })
</script>
</html>
6. 打开三个窗口,分别用postman来进行私发和群发的测试.

websoket在线测试

http://www.easyswoole.com/wstool.html

稍微修改一下

/**
     * 接收到客户端消息时触发
  */
@OnMessage
public void onMessage(String message, @PathParam("id") String id) {
try {
 //此处增加调用所有客户端的方法
 sendAllMessage(message);
} catch (Exception e) {
 e.printStackTrace();
}
log.info("receive a message from client id={},msg={}", id, message);
}

基于Redis发布订阅的分布式WebSocket通信

不同的server是拥有不同的session,不同的server的session是不共享的.session发送方法不能发送其他server的.

webSocketProcess.session.getBasicRemote().sendText(msg);

解决方案流程图:

8081 -> 群发了一个消息,希望在其他服务器上正在连接的客户端也能看到这个消息.

当8081某个用户群发的消息放入到redis的订阅队列中[通道]

四种web通讯方式

<button type="button" onclick="test()">get请求</button>
<script>
  function test(){
    //原生的ajax代码.
    //1. 创建XMLHttpRequest对象
    let xmlHttpRequest = new XMLHttpRequest();
    //设置连接信息
    xmlHttpRequest.open("GET", "/hello/100",true);
    //发送
    xmlHttpRequest.send();
    //回调函数 - readyState发生改变的时候就会调用.
    //0 1 2 3 4
    xmlHttpRequest.onreadystatechange=function(){
      if(xmlHttpRequest.readyState == 4 && xmlHttpRequest.status == "200"){
        //处理后端返回过来的json字符串
        let result = xmlHttpRequest.responseText;
        //将json字符串转成json对象
        let data = JSON.parse(result);
        console.log(data);
        console.log(data.code);
      }
    }
  }
</script>
  1. 短轮询

http端轮询是服务器收到请求不管是否有数据都直接响应 http 请求;其实就是普通的轮询。指在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。

传统的web通信模式。后台处理数据,需要一定时间,前端想要知道后端的处理结果,就要不定时的向后端发出请求以获得最新情况。

  • 请求中有大半是无用,难于维护,浪费带宽和服务器资源;
  • 响应的结果没有顺序(因为是异步请求,当发送的请求没有返回结果的时候,后面的请求又被发送。而此时如果后面的请求比前面的请 求要先返回结果,那么当前面的请求返回结果数据时已经是过时无效的数据了)。
var xhr = new XMLHttpRequest();
  setInterval(function(){
    xhr.open('GET','/user');
    xhr.onreadystatechange = function(){
    };
    xhr.send();
  },1000)
  1. 长轮询首先由客户端向服务器发起请求,当服务器收到客户端发来的请求后,服务器端不会直接进行响应,而是先将 这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制才返回。 客户端 JavaScript 响应处理函数会在处理完服务器返回的信息后,再次发出请求,重新建立连接。长轮询和短轮询比起来,它的 优点是明显减少了很多不必要的 http 请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。
  2. 短连接在HTTP 0.9和HTTP 1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。每次Http请求都会建立Tcp连接
  3. 长连接HTTP1.0已经开始支持了.但是默认还是短连接.HTTP1.1默认支持长连接只需要建立一次Tcp连接,以后Http请求重复使用同一个Tcp连接
  4. websocketHTTP协议中通信只能由客户端发起,服务器不能主动向客户端推送消息当建立起WebSocket的长连接,双方都可以互相发送信息,服务端可以主动发起信息。WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建长连接,并进行双向数据传输
function ajax(){
var xhr = new XMLHttpRequest();
xhr.open('GET','/user');
xhr.onreadystatechange = function(){
 ajax();
};
xhr.send();
}
ajax();

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
|
5月前
|
存储 运维 NoSQL
Redis Cluster集群模式部署
Redis Cluster集群模式部署
121 4
|
5月前
|
缓存 运维 NoSQL
Redis主从模式部署
Redis主从模式部署
65 4
|
5月前
|
运维 监控 NoSQL
Redis Sentinel哨兵模式部署
Redis Sentinel哨兵模式部署
111 2
|
12天前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis消息的生产者消费者模式
消息队列在Redis中可通过List数据结构实现,支持发布者订阅者和生产者消费者两种模式。生产者通过`lpush`向List添加消息,消费者通过`rpop`或`brpop`消费消息,后者支持阻塞等待。示例代码展示了如何使用Redis的生产者消费者模式。
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
63 4
|
5月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
72 0
|
1月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
25 2
|
1月前
|
存储 缓存 NoSQL
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
59 1
|
4月前
|
存储 NoSQL 算法
Redis 集群模式搭建
Redis 集群模式搭建
94 5
下一篇
无影云桌面