其他系列文章导航
文章目录
前言
本文实现了通过定时任务来调用接口,使两套数据轮换更新。
因为要区分两套数据,所以 key 要设置前缀。
例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。
一、整体流程
1.1 大致流程
- 从数据库里查数据。
- 更新当前前缀。
- 往redis集群更新数据。
1.2 流程代码解释
@Override public R<String> updateCampToJedis() { R<String> r = new R<>(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM"); String currentMonth = dateFormat.format(new Date()); //1. 从数据库里查数据 List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth); if (UserWideInfoList.size() == 0) { r.setCode(R.ERROR_CODE); r.setMsg("没有数据"); return r; } //2. 更新当前前缀 updateCurrentPrefixIndex(); r.setCode(R.SUCCESS_CODE); //3. 往redis集群存入数据 insertToJedis(ZhmsUserWideInfoList); return r; }
二、从数据库里查数据
2.1 SQL语句
这里因为每个月查询的是不同月份的表,所有用到动态 sql 。
<select id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo"> SELECT * FROM USER_WIDE_INFO_M_${SysMonth} </select>
三、更新当前前缀
要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。
3.1 设置前缀常量
用 A 和 B 来区分两组 key 。
代码如下:
private static final String PREFIX_A = "A"; private static final String PREFIX_B = "B";
3.2 初始化 currentPrefixIndex
向 redis集群中存入初始的 currentPrefixIndex 。
代码如下:
@GetMapping("/init") public String init() { return jedisCluster.set("currentPrefixIndex", "0"); }
3.3 获取当日前缀
先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。
代码如下:
//获取当日前缀 private String getKeyPrefix() { int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex")); if (currentPrefixIndex % 2 == 0) { return PREFIX_A; } else { return PREFIX_B; } }
3.4 更新 currentPrefixIndex
每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。
代码如下:
// 重新设置currentPrefixIndex private void updateCurrentPrefixIndex() { String currentValue = jedisCluster.get("currentPrefixIndex"); int newValue = Integer.parseInt(currentValue) + 1; jedisCluster.set("currentPrefixIndex", String.valueOf(newValue)); }
四、往redis集群更新数据
这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。
4.1 大致流程
- 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
- 把新数据解析后更新到 redis 集群。
注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。
代码如下:
private void insertToJedis(List<UserWideInfo> UserWideInfoList) { String keyPrefix = getKeyPrefix(); List<String> keys = new ArrayList<>(); Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); for (JedisPool node : clusterNodes.values()) { try (Jedis jedis = node.getResource()) { Set<String> nodeKeys = jedis.keys(keyPrefix + "*"); keys.addAll(nodeKeys); } } Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster); //先删旧的 for (JedisPool jedisPool : delKey.keySet()) { try (Jedis jedis = jedisPool.getResource()){ Pipeline pipelined = jedis.pipelined(); List<String> keysList = delKey.get(jedisPool); for (String key : keysList) { pipelined.del(key); } pipelined.sync(); } } List<String> keyList =new ArrayList<>(); HashMap<String, String> map = new HashMap<>(); //填充keyList和value for (UserWideInfo UserWideInfo : UserWideInfoList) { String key = keyPrefix + "_" + UserWideInfo.getBillNo(); keyList.add(key); //构建value ... ... map.put(key, value); } Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster); for (JedisPool jedisPool : result.keySet()) { try (Jedis jedis = jedisPool.getResource()){ Pipeline pipelined = jedis.pipelined(); // 获取当前JedisPool对应的键列表 List<String> keysList = result.get(jedisPool); // 将命令添加到Pipeline中 for (String key : keysList) { String value = map.get(key); pipelined.set(key, value); } // 执行Pipeline中的所有命令 pipelined.sync(); } } }
五、JedisCluster 实现 Pipeline 操作
编辑
5.1 实现过程
因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。
代码如下:
@Slf4j public class JedisPipelineUtil { /** * jedis集群下使用pipeline之前先将key分配管道 * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key * * @param list 存redis的key * @param jedisCluster * @return */ public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) { Map<String, List<String>> hostPhoneMap = new HashMap<>(); Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next(); JedisPool jedisPool = next.getValue(); Jedis jedis = jedisPool.getResource(); Map<Integer, String> slots = discoverClusterSlots(jedis); for (String s : list) { String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s)); if (hostPhoneMap.containsKey(hostAndPort)) { hostPhoneMap.get(hostAndPort).add(s); } else { List<String> newList = new ArrayList<>(); newList.add(s); hostPhoneMap.put(hostAndPort, newList); } } jedis.close(); return hostPhoneMap; } /** * jedis集群下使用pipeline之前先将key分配管道 * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key * * @param list 存redis的key * @param jedisCluster * @return */ public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) { Map<JedisPool, List<String>> map = new HashMap<>(); Map<String, List<String>> var1 = assignSlot(list, jedisCluster); Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, List<String>> next = iterator.next(); JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey()); map.put(jedisPool, next.getValue()); } return map; } private static Map<Integer, String> discoverClusterSlots(Jedis jedis) { Map<Integer, String> slotsMap = new HashMap<>(); List<Object> slots = jedis.clusterSlots(); Iterator var3 = slots.iterator(); while (var3.hasNext()) { Object slotInfoObj = var3.next(); List<Object> slotInfo = (List) slotInfoObj; if (slotInfo.size() > 2) { List<Integer> slotNums = getAssignedSlotArray(slotInfo); List<Object> hostInfos = (List) slotInfo.get(2); if (!hostInfos.isEmpty()) { String targetNode = generateHostAndPort(hostInfos); Iterator<Integer> var4 = slotNums.iterator(); while (var4.hasNext()) { Integer slot = var4.next(); slotsMap.put(slot, targetNode); } } } } return slotsMap; } private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) { slotNums.add(slot); } return slotNums; } private static String generateHostAndPort(List<Object> hostInfos) { String host = SafeEncoder.encode((byte[]) hostInfos.get(0)); int port = ((Long) hostInfos.get(1)).intValue(); return host + ":" + port; } }
使用 assignKey 方法就可以分配管道。