JedisCluster 通过 Pipeline 实现两套数据轮换更新

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 本文实现了通过定时任务来调用接口,使两套数据轮换更新。因为要区分两套数据,所以 key 要设置前缀。例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。

 其他系列文章导航

Java基础合集

数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀

3.4 更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

    1. 从数据库里查数据。
    2. 更新当前前缀。
    3. 往redis集群更新数据。

    1.2 流程代码解释

    @Override
        public R<String> updateCampToJedis() {
            R<String> r = new R<>();
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
            String currentMonth = dateFormat.format(new Date());
            //1. 从数据库里查数据
            List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
            if (UserWideInfoList.size() == 0) {
                r.setCode(R.ERROR_CODE);
                r.setMsg("没有数据");
                return r;
            }
            //2. 更新当前前缀
            updateCurrentPrefixIndex();
            r.setCode(R.SUCCESS_CODE);
            //3. 往redis集群存入数据
            insertToJedis(ZhmsUserWideInfoList);
            return r;
        }

    image.gif


    二、从数据库里查数据

    2.1 SQL语句

    这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
            SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
        </select>

    image.gif


    三、更新当前前缀

    要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

    3.1 设置前缀常量

    用 A 和 B 来区分两组 key 。

    代码如下:

    private static final String PREFIX_A = "A";
        private static final String PREFIX_B = "B";

    image.gif

    3.2 初始化 currentPrefixIndex

    向 redis集群中存入初始的 currentPrefixIndex 。

    代码如下:

    @GetMapping("/init")
        public String init() {
            return jedisCluster.set("currentPrefixIndex", "0");
        }

    image.gif

    3.3 获取当日前缀

    先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

    代码如下:

    //获取当日前缀
        private String getKeyPrefix() {
            int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
            if (currentPrefixIndex % 2 == 0) {
                return PREFIX_A;
            } else {
                return PREFIX_B;
            }
        }

    image.gif

    3.4 更新 currentPrefixIndex

    每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

    代码如下:

    // 重新设置currentPrefixIndex
        private void updateCurrentPrefixIndex() {
            String currentValue = jedisCluster.get("currentPrefixIndex");
            int newValue = Integer.parseInt(currentValue) + 1;
            jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
        }

    image.gif


    四、往redis集群更新数据

    这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

    4.1 大致流程

      1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
      2. 把新数据解析后更新到 redis 集群。

      注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

      代码如下:

      private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
              String keyPrefix = getKeyPrefix();
              List<String> keys = new ArrayList<>();
              Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
              for (JedisPool node : clusterNodes.values()) {
                  try (Jedis jedis = node.getResource()) {
                      Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                      keys.addAll(nodeKeys);
                  }
              }
              Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
              //先删旧的
              for (JedisPool jedisPool : delKey.keySet()) {
                  try (Jedis jedis = jedisPool.getResource()){
                      Pipeline pipelined = jedis.pipelined();
                      List<String> keysList = delKey.get(jedisPool);
                      for (String key : keysList) {
                          pipelined.del(key);
                      }
                      pipelined.sync();
                  }
              }
              List<String> keyList =new ArrayList<>();
              HashMap<String, String> map = new HashMap<>();
              //填充keyList和value
              for (UserWideInfo UserWideInfo : UserWideInfoList) {
                  String key = keyPrefix + "_" + UserWideInfo.getBillNo();
                  keyList.add(key);
                  //构建value
                  ...
                  ...
                  map.put(key, value);
              }
              Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
              for (JedisPool jedisPool : result.keySet()) {
                  try (Jedis jedis = jedisPool.getResource()){
                      Pipeline pipelined = jedis.pipelined();
                      // 获取当前JedisPool对应的键列表
                      List<String> keysList = result.get(jedisPool); 
                      // 将命令添加到Pipeline中
                      for (String key : keysList) {
                          String value = map.get(key);
                          pipelined.set(key, value);
                      }
                      // 执行Pipeline中的所有命令
                      pipelined.sync();
                  }
              }
          }

      image.gif


      五、JedisCluster 实现 Pipeline 操作

      image.gif编辑

      5.1 实现过程

      因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

      代码如下:

      @Slf4j
      public class JedisPipelineUtil {
          /**
           * jedis集群下使用pipeline之前先将key分配管道
           * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
           *
           * @param list         存redis的key
           * @param jedisCluster
           * @return
           */
          public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
              Map<String, List<String>> hostPhoneMap = new HashMap<>();
              Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
              Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
              JedisPool jedisPool = next.getValue();
              Jedis jedis = jedisPool.getResource();
              Map<Integer, String> slots = discoverClusterSlots(jedis);
              for (String s : list) {
                  String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
                  if (hostPhoneMap.containsKey(hostAndPort)) {
                      hostPhoneMap.get(hostAndPort).add(s);
                  } else {
                      List<String> newList = new ArrayList<>();
                      newList.add(s);
                      hostPhoneMap.put(hostAndPort, newList);
                  }
              }
              jedis.close();
              return hostPhoneMap;
          }
          /**
           * jedis集群下使用pipeline之前先将key分配管道
           * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
           *
           * @param list         存redis的key
           * @param jedisCluster
           * @return
           */
          public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
              Map<JedisPool, List<String>> map = new HashMap<>();
              Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
              Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
              while (iterator.hasNext()) {
                  Map.Entry<String, List<String>> next = iterator.next();
                  JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
                  map.put(jedisPool, next.getValue());
              }
              return map;
          }
          private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
              Map<Integer, String> slotsMap = new HashMap<>();
              List<Object> slots = jedis.clusterSlots();
              Iterator var3 = slots.iterator();
              while (var3.hasNext()) {
                  Object slotInfoObj = var3.next();
                  List<Object> slotInfo = (List) slotInfoObj;
                  if (slotInfo.size() > 2) {
                      List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                      List<Object> hostInfos = (List) slotInfo.get(2);
                      if (!hostInfos.isEmpty()) {
                          String targetNode = generateHostAndPort(hostInfos);
                          Iterator<Integer> var4 = slotNums.iterator();
                          while (var4.hasNext()) {
                              Integer slot = var4.next();
                              slotsMap.put(slot, targetNode);
                          }
                      }
                  }
              }
              return slotsMap;
          }
          private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
              List<Integer> slotNums = new ArrayList<>();
              for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
                  slotNums.add(slot);
              }
              return slotNums;
          }
          private static String generateHostAndPort(List<Object> hostInfos) {
              String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
              int port = ((Long) hostInfos.get(1)).intValue();
              return host + ":" + port;
          }
      }

      image.gif

      使用 assignKey 方法就可以分配管道。

      相关实践学习
      基于Redis实现在线游戏积分排行榜
      本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
      云数据库 Redis 版使用教程
      云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
      目录
      相关文章
      |
      4月前
      |
      缓存 Go API
      Go 实现一个支持多种过期、淘汰机制的本地缓存的核心原理
      本文旨在探讨实现一个支持多种 过期、淘汰 机制的 go 本地缓存的核心原理,我将重点讲解如何支持多样化的过期和淘汰策略。
      76 0
      |
      7月前
      |
      XML JSON Go
      etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
      在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
      47 0
      |
      8月前
      |
      NoSQL API 调度
      .NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
      .NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
      108 0
      |
      2月前
      |
      存储 API 流计算
      要测试和区分Flink的每个key状态和每个并行度的状态
      【2月更文挑战第23天】要测试和区分Flink的每个key状态和每个并行度的状态
      14 1
      |
      7月前
      |
      消息中间件 存储 算法
      Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
      Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
      |
      10月前
      |
      算法
      Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
      Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
      93 0
      |
      缓存 NoSQL Redis
      Redis 如何批量设置过期时间?PIPLINE的使用
      不要说在foreach中通过set()函数批量设置过期时间 我们引入redis的PIPLINE,来解决批量设置过期时间的问题。
      696 0
      Redis 如何批量设置过期时间?PIPLINE的使用
      |
      12月前
      |
      存储 NoSQL Java
      【JavaP6大纲】分布式会话篇:集群部署时的分布式 Session 如何实现?
      【JavaP6大纲】分布式会话篇:集群部署时的分布式 Session 如何实现?
      |
      jenkins Java 程序员
      远程触发Jenkins的Pipeline任务的并发问题处理
      上一篇实战了通过Http请求远程触发指定的Jenkins任务,并且将参数传递给Jenkins任务的Pipeline脚本使用,文末提到有个并发问题留待本文来处理
      237 0
      远程触发Jenkins的Pipeline任务的并发问题处理
      |
      存储 负载均衡 Java
      多节点服务器定时任务重复处理的问题
      多节点服务器定时任务重复处理的问题
      302 0