上个月帮一个创业团队排查线上事故,他们的电商活动页在大促高峰期整整卡了十分钟。监控显示数据库连接数直接打满,慢查询堆积了上千条。创始人盯着屏幕问我:“不就是展示一下商品详情吗,怎么就把数据库干崩了?”
我看了一眼代码,每次请求都直连MySQL查商品信息,热门商品被上千人同时刷,数据库扛得住才怪。其实这个问题有个标准解法——缓存。
Redis作为业界主流的内存数据库,配合Python的redis-py库,能在不改变业务代码结构的前提下,把数据库的查询压力降低90%以上。今天我们就从零开始,聊聊怎么用Python操作Redis,搭一套真正能打的缓存系统。
基础篇:先让Redis跑起来
安装redis-py只需要一行命令:
pip install redis
连接Redis的代码也极其简单:
import redis
连接本地Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
测试连接
r.set('foo', 'bar')
print(r.get('foo')) # 输出: bar
这里有个小细节:decode_responses=True会让返回的结果自动从字节串转成字符串,省去手动decode的麻烦。如果是生产环境,建议用连接池管理连接,避免频繁创建销毁消耗资源:
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)
实战篇:缓存最简单的写法
最常见的缓存场景是数据库查询。一个用户信息服务,如果不加缓存,代码长这样:
def get_user(user_id):
# 直接查数据库
return db.query(f"SELECT * FROM users WHERE id={user_id}")
加一层Redis缓存,代码变成这样:
def get_user(user_id):
# 先查缓存
cache_key = f"user:{user_id}"
user = r.get(cache_key)
if user:
return json.loads(user) # 缓存命中,直接返回
# 缓存未命中,查数据库
user = db.query(f"SELECT * FROM users WHERE id={user_id}")
# 写入缓存,设置过期时间
r.setex(cache_key, 3600, json.dumps(user))
return user
这个模式叫Cache-Aside,是业界最通用的缓存策略。流程很简单:读的时候先读缓存,没有就查数据库然后回写;写的时候先更新数据库,然后删除缓存(或者更新缓存)。
这里有两个关键点。一是缓存要有过期时间。上面的setex设置了3600秒,避免缓存项永远驻留导致数据不一致。二是key的命名规范。用user:1001这样的格式,冒号分隔不同部分,在Redis里会自动按层级展示,调试时一目了然。
进阶篇:用装饰器把缓存写成一行
上面的写法已经能解决问题,但还是不够优雅。每次都要手写缓存key、手动序列化、手动处理异常,重复代码太多。Python的装饰器可以把这些脏活累活封装起来。
一个最简版缓存装饰器可以这么写:
from functools import wraps
import json
def redis_cache(ttl=300):
def decorator(func):
@wraps(func)
def wrapper(args, *kwargs):
# 生成缓存key:函数名 + 参数
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached = r.get(key)
if cached:
return json.loads(cached)
# 执行原函数
result = func(*args, **kwargs)
# 写入缓存
r.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
使用
@redis_cache(ttl=600)
def get_user(user_id):
return db.query(f"SELECT * FROM users WHERE id={user_id}")
这样一来,业务代码完全不需要关心缓存逻辑,一个装饰器搞定。实际项目中可以用更成熟的库,比如redis_func_cache,它支持LRU、LFU等多种淘汰策略,还封装好了Lua脚本保证原子性。
坑点篇:缓存穿透、击穿、雪崩怎么破
缓存用不好,有时候比不用还糟糕。三个经典问题值得留意。
缓存穿透指查询一个根本不存在的数据。每次请求都绕过缓存直击数据库,如果被恶意利用,数据库分分钟被打挂。解决方案是缓存空值:
def get_user(user_id):
user = r.get(f"user:{user_id}")
if user is not None: # 注意:None表示缓存未命中,空字符串表示缓存了空值
return user if user != "NULL" else None
user = db.query(...)
# 无论查没查到,都写缓存
r.setex(f"user:{user_id}", 600, user or "NULL")
return user
缓存击穿指某个热点key过期瞬间,大量并发请求同时穿透到数据库。用分布式锁可以解决:
def get_hot_data(key):
data = r.get(key)
if data:
return data
# 加锁,只允许一个线程去查数据库
with r.lock(f"lock:{key}", timeout=10):
# 双重检查:拿到锁后可能已经被其他线程更新了
data = r.get(key)
if data:
return data
data = expensive_query()
r.setex(key, 3600, data)
return data
缓存雪崩指大量key同时过期,导致数据库瞬时压力暴增。解决方案是给过期时间加随机偏移量:
import random
基础过期时间3600秒,加上0-300秒的随机偏移
expire = 3600 + random.randint(0, 300)
r.setex(key, expire, value)
高阶篇:多级缓存让速度再翻倍
单靠Redis做缓存,每次请求还是有一次网络开销。如果能把最热的数据放在应用本地内存里,速度能再快一个数量级。
这就是多级缓存架构:本地缓存(毫秒级)→ Redis集群(亚毫秒级)→ 数据库(毫秒级)。80%的请求被本地缓存拦截,剩下的20%由Redis承载,数据库几乎只处理写请求和缓存未命中的场景。
redis-py自带了本地缓存模块_LocalCache,可以搭配使用:
from redis._cache import _LocalCache, EvictionPolicy
初始化本地缓存:最多存10000条,30秒过期,LRU淘汰策略
local_cache = _LocalCache(max_size=10000, ttl=30, eviction_policy=EvictionPolicy.LRU)
def get_user_with_multilevel_cache(user_id):
# 构造命令元组作为缓存key
command = ("GET", f"user:{user_id}")
# 查本地缓存
cached = local_cache.get(command)
if cached:
return cached
# 查Redis
user = r.get(f"user:{user_id}")
if user:
# 写入本地缓存
local_cache.set(command, user, keys_in_command=[f"user:{user_id}"])
return user
# 查数据库
user = db.query(...)
r.setex(f"user:{user_id}", 3600, user)
return user
这套架构在实践中有几个优化点:热点数据可以提前预热,比如活动开始前把商品信息加载到缓存;监控指标要跟上,重点关注本地缓存命中率(目标90%以上)和Redis查询延迟(目标1ms以下);数据更新时要同时淘汰两级缓存,保证一致性。
收尾
回到开头那个创业团队的故事。后来帮他们把用户信息和商品详情都加了Redis缓存,数据库连接数从打满降到个位数,接口响应时间从秒级降到几十毫秒。技术负责人发了条朋友圈:“原来我们之前一直在用石器时代的方式写代码。”
Redis缓存的本质很简单——用内存换速度,用空间换时间。但用好它需要理解背后的数据一致性、过期策略、并发控制这些细节。希望这篇文章能帮你把这些细节串起来,写出真正高效的缓存代码。