JedisCluster 通过 Pipeline 实现两套数据轮换更新

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文实现了通过定时任务来调用接口,使两套数据轮换更新。因为要区分两套数据,所以 key 要设置前缀。例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。

 其他系列文章导航

Java基础合集

数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀

3.4 更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

    1. 从数据库里查数据。
    2. 更新当前前缀。
    3. 往redis集群更新数据。

    1.2 流程代码解释

    @Override
        public R<String> updateCampToJedis() {
            R<String> r = new R<>();
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
            String currentMonth = dateFormat.format(new Date());
            //1. 从数据库里查数据
            List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
            if (UserWideInfoList.size() == 0) {
                r.setCode(R.ERROR_CODE);
                r.setMsg("没有数据");
                return r;
            }
            //2. 更新当前前缀
            updateCurrentPrefixIndex();
            r.setCode(R.SUCCESS_CODE);
            //3. 往redis集群存入数据
            insertToJedis(ZhmsUserWideInfoList);
            return r;
        }

    image.gif


    二、从数据库里查数据

    2.1 SQL语句

    这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
            SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
        </select>

    image.gif


    三、更新当前前缀

    要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

    3.1 设置前缀常量

    用 A 和 B 来区分两组 key 。

    代码如下:

    private static final String PREFIX_A = "A";
        private static final String PREFIX_B = "B";

    image.gif

    3.2 初始化 currentPrefixIndex

    向 redis集群中存入初始的 currentPrefixIndex 。

    代码如下:

    @GetMapping("/init")
        public String init() {
            return jedisCluster.set("currentPrefixIndex", "0");
        }

    image.gif

    3.3 获取当日前缀

    先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

    代码如下:

    //获取当日前缀
        private String getKeyPrefix() {
            int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
            if (currentPrefixIndex % 2 == 0) {
                return PREFIX_A;
            } else {
                return PREFIX_B;
            }
        }

    image.gif

    3.4 更新 currentPrefixIndex

    每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

    代码如下:

    // 重新设置currentPrefixIndex
        private void updateCurrentPrefixIndex() {
            String currentValue = jedisCluster.get("currentPrefixIndex");
            int newValue = Integer.parseInt(currentValue) + 1;
            jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
        }

    image.gif


    四、往redis集群更新数据

    这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

    4.1 大致流程

      1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
      2. 把新数据解析后更新到 redis 集群。

      注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

      代码如下:

      private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
              String keyPrefix = getKeyPrefix();
              List<String> keys = new ArrayList<>();
              Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
              for (JedisPool node : clusterNodes.values()) {
                  try (Jedis jedis = node.getResource()) {
                      Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                      keys.addAll(nodeKeys);
                  }
              }
              Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
              //先删旧的
              for (JedisPool jedisPool : delKey.keySet()) {
                  try (Jedis jedis = jedisPool.getResource()){
                      Pipeline pipelined = jedis.pipelined();
                      List<String> keysList = delKey.get(jedisPool);
                      for (String key : keysList) {
                          pipelined.del(key);
                      }
                      pipelined.sync();
                  }
              }
              List<String> keyList =new ArrayList<>();
              HashMap<String, String> map = new HashMap<>();
              //填充keyList和value
              for (UserWideInfo UserWideInfo : UserWideInfoList) {
                  String key = keyPrefix + "_" + UserWideInfo.getBillNo();
                  keyList.add(key);
                  //构建value
                  ...
                  ...
                  map.put(key, value);
              }
              Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
              for (JedisPool jedisPool : result.keySet()) {
                  try (Jedis jedis = jedisPool.getResource()){
                      Pipeline pipelined = jedis.pipelined();
                      // 获取当前JedisPool对应的键列表
                      List<String> keysList = result.get(jedisPool); 
                      // 将命令添加到Pipeline中
                      for (String key : keysList) {
                          String value = map.get(key);
                          pipelined.set(key, value);
                      }
                      // 执行Pipeline中的所有命令
                      pipelined.sync();
                  }
              }
          }

      image.gif


      五、JedisCluster 实现 Pipeline 操作

      image.gif编辑

      5.1 实现过程

      因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

      代码如下:

      @Slf4j
      public class JedisPipelineUtil {
          /**
           * jedis集群下使用pipeline之前先将key分配管道
           * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
           *
           * @param list         存redis的key
           * @param jedisCluster
           * @return
           */
          public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
              Map<String, List<String>> hostPhoneMap = new HashMap<>();
              Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
              Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
              JedisPool jedisPool = next.getValue();
              Jedis jedis = jedisPool.getResource();
              Map<Integer, String> slots = discoverClusterSlots(jedis);
              for (String s : list) {
                  String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
                  if (hostPhoneMap.containsKey(hostAndPort)) {
                      hostPhoneMap.get(hostAndPort).add(s);
                  } else {
                      List<String> newList = new ArrayList<>();
                      newList.add(s);
                      hostPhoneMap.put(hostAndPort, newList);
                  }
              }
              jedis.close();
              return hostPhoneMap;
          }
          /**
           * jedis集群下使用pipeline之前先将key分配管道
           * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
           *
           * @param list         存redis的key
           * @param jedisCluster
           * @return
           */
          public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
              Map<JedisPool, List<String>> map = new HashMap<>();
              Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
              Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
              while (iterator.hasNext()) {
                  Map.Entry<String, List<String>> next = iterator.next();
                  JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
                  map.put(jedisPool, next.getValue());
              }
              return map;
          }
          private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
              Map<Integer, String> slotsMap = new HashMap<>();
              List<Object> slots = jedis.clusterSlots();
              Iterator var3 = slots.iterator();
              while (var3.hasNext()) {
                  Object slotInfoObj = var3.next();
                  List<Object> slotInfo = (List) slotInfoObj;
                  if (slotInfo.size() > 2) {
                      List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                      List<Object> hostInfos = (List) slotInfo.get(2);
                      if (!hostInfos.isEmpty()) {
                          String targetNode = generateHostAndPort(hostInfos);
                          Iterator<Integer> var4 = slotNums.iterator();
                          while (var4.hasNext()) {
                              Integer slot = var4.next();
                              slotsMap.put(slot, targetNode);
                          }
                      }
                  }
              }
              return slotsMap;
          }
          private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
              List<Integer> slotNums = new ArrayList<>();
              for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
                  slotNums.add(slot);
              }
              return slotNums;
          }
          private static String generateHostAndPort(List<Object> hostInfos) {
              String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
              int port = ((Long) hostInfos.get(1)).intValue();
              return host + ":" + port;
          }
      }

      image.gif

      使用 assignKey 方法就可以分配管道。

      相关实践学习
      基于Redis实现在线游戏积分排行榜
      本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
      云数据库 Redis 版使用教程
      云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
      目录
      相关文章
      |
      XML JSON Go
      etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
      在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
      94 0
      |
      NoSQL API 调度
      .NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
      .NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
      192 0
      |
      6月前
      |
      NoSQL 关系型数据库 MySQL
      实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
      实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
      |
      NoSQL Redis Anolis
      性能优化特性之:Redis批处理pipeline模式
      本文介绍了一种更贴近实际使用的redis验测方法:多pipline模式,并从原理、使用方法进行详细阐述。
      |
      缓存 NoSQL Redis
      Redis 如何批量设置过期时间?PIPLINE的使用
      不要说在foreach中通过set()函数批量设置过期时间 我们引入redis的PIPLINE,来解决批量设置过期时间的问题。
      791 0
      Redis 如何批量设置过期时间?PIPLINE的使用
      |
      消息中间件 存储 算法
      Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
      Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
      |
      数据处理 Go
      让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式
      让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式
      |
      消息中间件 JavaScript 小程序
      慎用BeanUtils,性能真的拉跨!
      慎用BeanUtils,性能真的拉跨!
      |
      存储 NoSQL Java
      【JavaP6大纲】分布式会话篇:集群部署时的分布式 Session 如何实现?
      【JavaP6大纲】分布式会话篇:集群部署时的分布式 Session 如何实现?
      104 0
      |
      机器学习/深度学习 存储 NoSQL
      Redis keys命令,生产环境慎用,最好屏蔽掉
      Redis keys命令,生产环境慎用,最好屏蔽掉
      605 0
      Redis keys命令,生产环境慎用,最好屏蔽掉