Redis基本命令
第1关:字符串、列表与集合
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis conn = redis.Redis() def task_empty(): # 请在下面完成判断任务列表是否为空 #********* Begin *********# return int(conn.llen("task_list")) == 0 #********* End *********# def get_task(): # 请在下面完成获取一个任务 #********* Begin *********# task = conn.rpop("task_list") conn.set("current_task", task) #********* End *********# def get_unallocated_staff(): # 请在下面完成获取一个未分配的员工 #********* Begin *********# staff = conn.srandmember("unallocated_staff") conn.smove("unallocated_staff", "allocated_staff", staff) return staff #********* End *********# def allocate_task(staff): # 请在下面完成分配任务 #********* Begin *********# conn.append("current_task", ':' + str(staff)) conn.lpush("task_queue", conn.get("current_task")) conn.set("current_task", "None") #********* End *********#
第2关:哈希与有序集合
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis conn = redis.Redis() # 初始化任务信息到 Redis 中 def set_task_info(task_id): # 请在下面完成要求的功能 #********* Begin *********# conn.hset("task_status", task_id, "init") #********* End *********# # 将任务添加至任务队列 def add_task_to_queue(task_id, priority): # 请在下面完成要求的功能 #********* Begin *********# conn.zadd("task_queue", task_id, int(priority)) set_task_info(task_id) #********* End *********# # 从任务队列中取出优先级最高的任务 def get_task(): # 请在下面完成要求的功能 #********* Begin *********# task_list_by_priority = conn.zrevrange("task_queue", 0, -1) current_task = task_list_by_priority[0] conn.zrem('task_queue', current_task) conn.hset("task_status", current_task, "processing") return current_task #********* End *********#
第3关:Redis基本事务与其他命令
#!/usr/bin/env python #-*- coding:utf-8 -*- import time import redis conn = redis.Redis() # 用户端发起派车请求 def request_cab(user_id, priority): # 请在下面完成要求的功能 #********* Begin *********# if conn.hexists('request:info:' + str(user_id), 'time'): return pipe = conn.pipeline() pipe.lpush('cab:queue', user_id) pipe.hmset('request:info:' + str(user_id), {'time': time.time(), 'priority': priority}) pipe.expire('request:info:' + str(user_id), 10 * 60) pipe.execute() #********* End *********# # 平台选择优先级最高的派车请求并派车 def allocate(): # 请在下面完成要求的功能 #********* Begin *********# cab_queue = conn.sort('cab:queue', by='request:info:*->priority', desc=True) current_respond = cab_queue[0] conn.lrem('cab:queue', current_respond, 1) return current_respond #********* End *********# # 用户端取消派车请求 def cancel_cab(user_id): conn.expire('request:info:' + str(user_id), 0) conn.lrem('cab:queue', user_id)
Redis命令实践
第1关:使用Redis管理登录令牌
#!/usr/bin/env python #-*- coding:utf-8 -*- import time import redis conn = redis.Redis() # 核对令牌,并返回该令牌对应的用户 ID def check_token(token): # 请在下面完成要求的功能 #********* Begin *********# return conn.hget('login', token) #********* End *********# # 更新令牌,同时存储令牌的创建时间 def update_token(token, user_id): # 请在下面完成要求的功能 #********* Begin *********# timestamp = time.time() pipe = conn.pipeline() pipe.hset('login', token, user_id) pipe.zadd('recent:token', token, timestamp) pipe.execute() #********* End *********# # 清理过期令牌 def clean_tokens(): # 请在下面完成要求的功能 #********* Begin *********# one_week_ago_timestamp = time.time() - 86400 expired_tokens = conn.zrangebyscore('recent:token', 0, one_week_ago_timestamp) conn.zremrangebyscore('recent:token', 0, one_week_ago_timestamp) conn.hdel('login', *expired_tokens) #********* End *********#
第2关:使用Redis实现购物车
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis conn = redis.Redis() # 添加商品 def add_item(name, price): # 请在下面完成要求的功能 #********* Begin *********# item_id = conn.incr('item_id') item_info_key = 'item:' + str(item_id) + ':info' conn.hmset(item_info_key, {"name": name, "price": price}) conn.expire(item_info_key, 30 * 24 * 60 * 60) return item_id #********* End *********# # 加入购物车 def add_to_cart(user_id, item, count): if count > 0: conn.hset('cart:' + user_id, item, count) else: conn.hrem('cart:' + user_id, item) # 获取购物车详情 def get_cart_info(user_id): # 请在下面完成要求的功能 #********* Begin *********# return conn.hgetall('cart:' + user_id) #********* End *********#
第3关:使用Redis做页面缓存
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis conn = redis.Redis() # 使用 Redis 做页面缓存 def cache_request(request_url): # 请在下面完成要求的功能 #********* Begin *********# page_key = 'cache:' + str(hash(request_url)) content = conn.get(page_key) if not content: content = "content for " + request_url conn.setex(page_key, content, 600) return content #********* End *********#
第4关:使用Redis做数据缓存
#!/usr/bin/env python #-*- coding:utf-8 -*- import time import json import redis conn = redis.Redis() # 将数据加入缓存队列 def add_cache_list(data_id, delay): # 请在下面完成要求的功能 #********* Begin *********# conn.zadd('cache:delay', data_id, delay) conn.zadd('cache:list', data_id, time.time()) #********* End *********# # 缓存数据 def cache_data(): # 请在下面完成要求的功能 #********* Begin *********# next = conn.zrange('cache:list', 0, 0, withscores=True) now = time.time() if not next or next[0][1] > now: time.sleep(0.1) data_id = next[0][0] delay = conn.zscore('cache:delay', data_id) if delay <= 0: conn.zrem('cache:delay', data_id) conn.zrem('cache:list', data_id) conn.delete('cache:data:' + data_id) else: data = {'id': data_id, 'data': 'fake data'} conn.zadd('cache:list', data_id, now + delay) conn.set('cache:data:' + data_id, json.dumps(data)) #********* End *********#