测试区Codis集群之前废了,今天重新搭起来之后,测试程序发现很多key的数据都没了。
如果整个库迁移过来没有必要。 只需要把特定的几个key迁移过来即可。
hash、list string 类型key 迁移
以下为Python版本实现
#!/usr/bin/env python # -*- coding: UTF-8 -*- ''' @Project :redis_key_mig @File :redisMigrate.py @Author :ninesun @Date :2021/11/29 14:22 @Desc: ''' import sys import redis # 迁移hash def moveHash(cursor): cursor, data = r.hscan(key, cursor) for eachKey in data: rNew.hset(key, eachKey, data[eachKey]) print(key, "---处理了---", len(data), '个') if cursor != 0: print(cursor, "批处理") moveHash(cursor) else: print(cursor, "处理完成了") # 迁移list def moveList(): length = r.llen(key) if length == 0: print(key, "---list迁移结束---剩余长度", length) else: # 每次迁移一千个 start = length - 1000; if start < 0: start = 0 data = r.lrange(key, start, -1) pl = r.pipeline(); for eachI in data: setAdd = r.sadd("ordernokey_move", eachI); if setAdd == 1: pl.rpush("aaaaaaa", eachI) else: print("迁移的key的值重复了", eachI) pl.execute() if start == 0: # 清空 r.ltrim(key, 1, 0) r.ltrim(key, 0, start - 1) moveList() ############################ if __name__ == '__main__': # key = sys.argv[1] # opeID 迁移 # key = 'ope_array' # key = 'ope_cf' # key = 'ope_oc' # # key = 'prod_array' # key = 'prod_cf' # key = 'prod_oc' key = 'basicOwn' print('输入的key是:' + key) # ip = '47.254.149.109' # password = 'Kikuu2018' ip1 = '10.50...' ip2 = '10.50...' # 连接redis r = redis.Redis(host=ip1, port=19000, db=0, decode_responses=True) # 连接redis 带接收的库 rNew = redis.Redis(host=ip2, port=19000, db=0, decode_responses=True) keyType = r.type(key) if keyType == 'string': rNew.set(key, r.get(key)) print("key=" + key + "迁移到新库") if keyType == 'hash': cursor = r.hlen(key) print(" key值长度是 + ", cursor) moveHash(0) if keyType == 'list': moveList()
批量迁移某个soted set 类型的key
由于业务需要,需要迁移集群中的几个key,数量不大,不过key需要自己拼接。
其中一个key的类型是string
另外一个key的类型是zset。
py代码如下
# 迁移soted set def moveZset(keyStr): key = keyStr.lstrip() length = r.zcard(key) if length == 0: get_logger().info("%s ---list迁移结束---剩余长度: %s", key, length) return # print(key, "---list迁移结束---剩余长度", length) else: # 每次迁移一千个 start = length - 1000; if start < 0: start = 0 data = r.zrange(key, 0, -1, withscores='true') pl = r.pipeline(); count = 1 for eachI in data: length = length - 1 score = str(eachI[1]) value = eachI[0] # ch=true 仅仅是update 了score 才会,原封不动的zadd直接返回0 zaddRet = rNew.zadd(key, {value: score}, ch='true'); scoreRet = r.zcard(key) scoreRetNew = rNew.zcard(key) # rNew.delete(key) # if scoreRet != scoreRetNew: # get_logger().error("数据不一致,key: %s,scoreRet: %s, scoreRetNew: %s",key,scoreRet,scoreRetNew) if zaddRet == 1: pl.zadd(key, {value: score}) count = count + 1; else: count = count + 1; # get_logger().info("迁移的key的值重复了,第: %s个key: %s,score: %s",count,key,scoreRet) # print("迁移的key的值重复了,key: ", key) get_logger().info("---剩余长度: %s", length);
normally the return value of ZADD
only counts the number of new elements added. zadd不会返回已存在的元素
更新score也意味着在有序集合中的位置变化。
zadd 中的score范围是多少呢?
between -9007199254740992 and 9007199254740992
踩的坑
错误的姿势
正确的姿势
zadd:db.zadd(REDIS_KEY, {member:score})
竟让是member在前score在后。和redis的操作刚好相反.