Redis缓存最佳实践指南
一、缓存更新策略
1. Cache Aside Pattern(旁路缓存)
这是最常用的缓存策略,适合读多写少的场景。
class UserService:
def get_user(self, user_id):
# 1. 先查询缓存
user = redis_client.get(f"user:{user_id}")
if user:
return json.loads(user)
# 2. 缓存未命中,查询数据库
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
# 3. 写入缓存
if user:
redis_client.setex(
f"user:{user_id}",
3600, # 过期时间1小时
json.dumps(user)
)
return user
def update_user(self, user):
# 1. 更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(user.name, user.id)
)
# 2. 删除缓存
redis_client.delete(f"user:{user.id}")
2. Write Through(读写穿透)
适合写多的场景,确保缓存与数据库的一致性。
def write_through_update(user):
# 1. 更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(user.name, user.id)
)
# 2. 更新缓存
redis_client.setex(
f"user:{user.id}",
3600,
json.dumps(user)
)
二、缓存击穿防护
1. 分布式锁实现
class RedisLock:
def __init__(self, key, expire=60):
self.key = f"lock:{key}"
self.expire = expire
def acquire(self):
# SET NX = set if not exists
return redis_client.set(
self.key,
'locked',
ex=self.expire,
nx=True
)
def release(self):
redis_client.delete(self.key)
def get_hot_data(key):
# 1. 查询缓存
data = redis_client.get(key)
if
return json.loads(data)
# 2. 缓存未命中,使用分布式锁防止击穿
lock = RedisLock(key)
if lock.acquire():
try:
# 双重检查
data = redis_client.get(key)
if
return json.loads(data)
# 查询数据库
data = db.query_hot_data(key)
# 写入缓存
redis_client.setex(key, 3600, json.dumps(data))
return data
finally:
lock.release()
else:
# 等待100ms后重试
time.sleep(0.1)
return get_hot_data(key)
三、大key处理方案
1. 大key识别
def scan_big_keys(pattern="*", min_size=1024*1024):
"""扫描大于指定大小的key"""
cursor = 0
big_keys = []
while True:
cursor, keys = redis_client.scan(
cursor,
match=pattern,
count=1000
)
for key in keys:
key_type = redis_client.type(key)
key_size = get_key_size(key, key_type)
if key_size > min_size:
big_keys.append({
'key': key,
'type': key_type,
'size': key_size
})
if cursor == 0:
break
return big_keys
def get_key_size(key, key_type):
if key_type == 'string':
return len(redis_client.get(key))
elif key_type == 'hash':
return len(redis_client.hgetall(key))
elif key_type == 'list':
return redis_client.llen(key)
elif key_type == 'set':
return redis_client.scard(key)
elif key_type == 'zset':
return redis_client.zcard(key)
2. 大key拆分
class UserFavorites:
def __init__(self, user_id):
self.base_key = f"user:{user_id}:favorites"
def add(self, item_id):
# 使用取模方式拆分大key
shard = item_id % 10
key = f"{self.base_key}:{shard}"
return redis_client.sadd(key, item_id)
def remove(self, item_id):
shard = item_id % 10
key = f"{self.base_key}:{shard}"
return redis_client.srem(key, item_id)
def is_favorite(self, item_id):
shard = item_id % 10
key = f"{self.base_key}:{shard}"
return redis_client.sismember(key, item_id)
def get_all(self):
# 合并所有分片的结果
result = set()
for i in range(10):
key = f"{self.base_key}:{i}"
result.update(redis_client.smembers(key))
return result
四、性能优化建议
批量操作:使用pipeline减少网络往返
def batch_get_users(user_ids): pipe = redis_client.pipeline() # 批量获取 for user_id in user_ids: pipe.get(f"user:{user_id}") # 一次性执行 results = pipe.execute() # 处理结果 users = [] for result in results: if result: users.append(json.loads(result)) return users
序列化优化:对于频繁访问的数据,考虑使用protobuf等二进制序列化
内存优化:合理设置过期时间,避免内存泄漏
def cache_with_variable_ttl(key, value): """根据数据热度设置不同的过期时间""" access_count = redis_client.incr(f"access:{key}") if access_count > 1000: ttl = 7200 # 热点数据2小时过期 else: ttl = 3600 # 普通数据1小时过期 redis_client.setex(key, ttl, value)
通过以上实践指南,您应该能更好地在项目中使用Redis缓存。记住:缓存设计没有银弹,需要根据具体场景选择合适的策略。