关于redis的实战案例
(1)案例1:KV缓存
第1个是最基础也是最常见的就是KV功能,我们可以用Redis来缓存用户信息、会话信息、商品信息等等。下面这段代码就是通过缓存读取逻辑。
import redis pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=6, decode_responses=True) r = redis.Redis(connection_pool=pool) def get_user(user_id): user = r.get(user_id) if not user: user = UserInfo.objects.get(pk=user_id) r.setex(user_id, 3600, user) return user
(2)案例2:分布式锁
什么是分布式锁
❝
分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
❞
提到Redis的分布式锁,很多小伙伴马上就会想到setnx
+ expire
命令。即先用setnx
来抢锁,如果抢到之后,再用expire
给锁设置一个过期时间,防止锁忘记了释放。
❝
SETNX 是SET IF NOT EXISTS的简写.日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。
❞
假设某电商网站的某商品做秒杀活动,key可以设置为key_resource_id,value设置任意值,伪代码如下:
方案1
import redis pool = redis.ConnectionPool(host='127.0.0.1') r = redis.Redis(connection_pool=pool) ret = r.setnx("key_resource_id", "ok") if ret: r.expire("key_resource_id", 5) # 设置过期时间 print("抢购成功!") r.delete("key_resource_id") # 释放资源 else: print("抢购失败!")
但是这个方案中,setnx
和expire
两个命令分开了,「不是原子操作」。如果执行完setnx
加锁,正要执行expire
设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」。
方案2
SETNX + value值是(系统时间+过期时间)
为了解决方案一,「发生异常锁得不到释放的场景」,可以把过期时间放到setnx
的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:
import time def foo(): expiresTime = time.time() + 10 ret = r.setnx("key_resource_id", expiresTime) if ret: print("当前锁不存在,加锁成功") return True oldExpiresTime = r.get("key_resource_id") if float(oldExpiresTime) < time.time(): # 如果获取到的过期时间,小于系统当前时间,表示已经过期 # 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间 newExpiresTime = r.getset("key_resource_id", expiresTime) if oldExpiresTime == newExpiresTime: # 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁 return True # 加锁成功 return False # 其余情况加锁皆失败 foo()
方案3
实际上,我们还可以使用Py的redis模块中的set函数来保证原子性(包含setnx和expire两条指令)代码如下:
r.set("key_resource_id", "1", nx=True, ex=10)
(3)案例4:延迟队列
延时队列可以通过Redis的zset(有序列表)来实现。 我们将消息序列化为一个字符串作为zset的值。 这个消息的到期时间处理时间作为score, 然后用多个线程轮询zset获取到期的任务进行处理, 多线程时为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。 因为有多个线程,所有需要考虑并发争抢任务,确保任务不能被多次执行。
import time import uuid import redis pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True) r = redis.Redis(connection_pool=pool) def delay_task(task_name, delay_time): # 保证value唯一 task_id = task_name + str(uuid.uuid4()) retry_ts = time.time() + delay_time r.zadd("delay-queue", {task_id: retry_ts}) def loop(): print("循环监听中...") while True: # 最多取1条 task_list = r.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1) if not task_list: # 延时队列空的,休息1s print("cost 1秒钟") time.sleep(1) continue task_id = task_list[0] success = r.zrem("delay-queue", task_id) if success: # 处理消息逻辑函数 handle_msg(task_id) def handle_msg(msg): """消息处理逻辑""" print(f"消息{msg}已经被处理完成!") import threading t = threading.Thread(target=loop) t.start() delay_task("任务1延迟5", 15) delay_task("任务2延迟2", 20) delay_task("任务3延迟3", 30) delay_task("任务4延迟10", 60)
redis的zrem方法是对多线程争抢任务的关键,它的返回值决定了当前实例有没有抢到任务,因为loop方法可能会被多个线程、多个进程调用, 同一个任务可能会被多个进程线程抢到,通过zrem来决定唯一的属主。同时,一定要对handle_msg进行异常捕获, 避免因为个别任务处理问题导致的循环异常退出。
(4)案例5:发布订阅
subscribe channel # 订阅 publish channel mes # 发布消息
import threading import redis r = redis.Redis(host='127.0.0.1') def recv_msg(): pub = r.pubsub() pub.subscribe("fm104.5") pub.parse_response() while 1: msg = pub.parse_response() print(msg) def send_msg(): msg = input(">>>") r.publish("fm104.5", msg) t = threading.Thread(target=send_msg) t.start() recv_msg()
(5)案例3:定时任务
利用 Redis 也能实现订单30分钟自动取消。
用户下单之后,在规定时间内如果不完成付款,订单自动取消,并且释放库存使用技术:Redis键空间通知(过期回调)用户下单之后将订单id作为key,任意值作为值存入redis中,给这条数据设置过期时间,也就是订单超时的时间启用键空间通知
开启过期key监听
from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) # 监听过期key def event_handler(msg): print("sss",msg) thread.stop() pubsub = redis.pubsub() pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler}) thread = pubsub.run_in_thread(sleep_time=0.05)