〖Python 数据库开发实战 - Python与Redis交互篇⑨〗- 利用 redis-py 实现模拟商品秒杀活动案例

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 〖Python 数据库开发实战 - Python与Redis交互篇⑨〗- 利用 redis-py 实现模拟商品秒杀活动案例

f43e3c1e54a64274b9fcb4f652be30a9.png今天的这一章节我们将来实现 “模拟商品秒杀活动” 的案例,在这个案例中将使用到多线程技术,相较于上一章节的 “缓存观众投票数据信息” 的案例。在技术难点山又提升了一个阶层,Python 自带的线程池技术免去了我们自己创建和管理线程的麻烦,只需要编写好线程需要执行的任务,将任务交给线程池运行即可



为什么要引入线程池技术


我们使用的数据库连接池技术是为了免去反复创建和销毁连接,造成硬件资源的不必要浪费。


线程池技术的设计考量也是如此,如果程序中经常需要用到线程, 频繁的创建和销毁线程对象,会极大的浪费很多硬件资源(线程对象不像普通的对象,在创建线程的时候需要分配一定的内存,还需要调度线程去启动执行等等... 这个过程是很麻烦的) 。

我们可以把线程和任务分离开利用线程池缓存一些空闲的线程(空闲线程的数量是可以自定义的), 如此线程可以反复利用,省去了重复创建的麻烦。


当需要线程去执行某一个任务的时候,就可以将那个这个任务写成一个函数传递给线程池,线程池就会挑选一个空闲的线程去执行这个任务。任务执行结束之后,分配出去执行任务的线程就会被回收,也就避免了现成的反复创建和销毁。 这个原理与数据库连接池技术比较相像。



通过案例加深线程池技术原理的理解


  • 接下来我们通过一个简单的案例了解一下 Python 程序中的 线程池应该如何去使用。案例如下:
# coding:utf-8
from concurrent.futures import ThreadPoolExecutor  # 导入 concurrent.futures 模块的 线程池类 ThreadPoolExecutor
def say_hello():  # 定义线程任务函数,只执行打印 "Hello" 的操作
    print("Hello")
executor = ThreadPoolExecutor(50)   # 利用 ThreadPoolExecutor 构造线程池对象,传入 缓存的线程池数量 50
for i in range(0, 10):        # 利用 for 循环与 executor 的 submit 函数传入 线程任务函数 - say_ hello
    hello = executor.submit(say_hello)
    print(hello)



image.png


线程池这个例子非常的简单,接下来我们将要实现的 “抢购秒杀的活动”,就会利用线程池模拟很多用户来抢购商品



实现多线程模拟商品秒杀案例 - 思路


案例需求:

利用Python多线程模拟商品秒杀过程,不可以出现超买和超卖的情况。

假设A商品有50件参与秒杀活动,10分钟秒杀自动结束。

实现思路:

以 1000 人为例,模拟1000人去抢购 50 件商品。如果使用 for 循环去执行 1000 次,那就不是抢购了,而是排队购买东西。

因为只有线程是并发执行的,所以只有多线程才能够模拟并发的抢购,如此这种秒杀的盛况才能够出现,这也是我们之所在讲解案例之前介绍线程池的主要原因。

线程池缓存线程的数量为多少合适呢?1000 人抢购,由于网络延迟、操作APP、操作浏览器的因素,可能每秒钟只有几百人参与秒杀。那么创建一个缓存了 200 数量线程的线程池即可,可以模拟每秒钟最多 200 个用户同时抢购。

redis 采用的是单线程机制,所以也不可能会出现超买和超卖的问题;如果使用 MySQL 的话,就可能会存在这种情况。

既然这个案例需要使用 redis 来完成,那就需要分析一下,redis 里需要保存的记录都有哪些?如下:

kill_total      # 商品总数;(字符串类型)
kill_num      # 秒杀成功的数量;
kill_flag     # 当秒杀数量为 50 ,或者时间满足 10 分钟 的时候,就删除掉 kill_flag,表示秒杀活动结束。 
kill_user     # 记录秒杀成功的用户id,使用 列表 进行记录。(需要注意的是用户id是不可以重复的)


  • 实现多线程模拟商品秒杀案例 - 代码
# coding:utf-8
import redis
import random
from redis_db import redis_Pool         # 导入 redis 连接池
from concurrent.futures import ThreadPoolExecutor       # 导入 concurrent.futures 模块的 线程池类 ThreadPoolExecutor
"""
生成随机用户的id
"""
s = set()               # 设置空集合,将其赋值给变量 s
"""
定义一个死循环,向集合中添加 1000个 不重复的用户id;
因为使用的是 random 模块,不能保证生成的用户 id 是不重复的,所以 while 循环是不能够循环 1000 次的;
所以 while 循环的结束条件是 s 的长度为 1000 的时候,小于 1000 的情况下,是需要继续向 s 中添加 用户id 的
"""
while True:
    if len(s) == 1000:
        break
    num = random.randint(10000, 100000)
    s.add(num)      # 将随机生成的 用户id 添加到 s 中去
"""
获得 redis 连接
"""
con = redis.Redis(
    connection_pool=redis_Pool
)
"""
利用 try...except...finally 捕获异常
1、需要判断 redis 中是否包含有秒杀用到的数据,这段程序在调试的过程中肯定会运行不止一次,如果已存在秒杀数据,势必会影响下一次运行。
2、如果第一次运行的结果,秒杀成功的记录没有被删除,第二次再次运行的时候又会将秒杀成功的记录再次添加,这就非常不合理了。
3、所以每一次运行脚本之前,就需要将与 秒杀案例 相关的 redis 的记录全部删掉
"""
try:
    con.delete("kill_total", "kill_num", "kill_flag", "kill_user")      # 删掉 redis 的秒杀案例相关的记录
    con.set("kill_total", 50)       # 写入 秒杀商品总数量 kill_total,数量 50
    con.set("kill_num", 0)          # 写入 秒杀活动成功抢购的数量 kill_num,初始数量 0
    con.set("kill_flag", 1)         # 写入 秒杀活动 的状态 kill_num,参数值 1 ;这里给多少随意,只是用来判断活动的标识。
    con.expire("kill_flag", 600)    # 写入 秒杀活动 的状态的过期时间,十分钟,600秒
    """kill_user 不用创建,在后面用到的时候像 kill_user 添加记录时会自动创建"""
except Exception as e:          # 捕获异常并打印输出
    print(e)
finally:                        # 回收 redis 连接
    del con
"""
定义线程池,传入 缓存的线程池数量 200
"""
executor = ThreadPoolExecutor(200)
"""
创建线程池任务 - 秒杀
1、参与秒杀时,需要重点关注秒杀的数据
2、利用 redis 事务去执行 秒杀任务 数据的记录 - pipeline()
"""
def buy():
    """获取 redis 链接(区别于第 31 行的 con 连接);创建 redis 事务 - pipeline()"""
    connection = redis.Redis(
        connection_pool=redis_Pool
    )
    """定义 pipline """
    pipline = connection.pipeline()
    """利用 try...except...fi nally 捕获异常;并最终回收链接。"""
    try:
        """通过 connection 判断 '秒杀' 是否处于有效状态;如果 'kill_flag' 为 0 的话,则秒杀任务就不存在,没有任何实际意义"""
        if connection.exists("kill_flag") == 1:
            """在秒杀活动生效的情况下,观察 成功抢购数 与 成功抢购的用户ID 的两个数据"""
            pipline.watch("kill_num", "kill_user")
            """获取 秒杀商品 的总数,需要转码与转换 int 数据类型"""
            total = int(pipline.get("kill_total").decode("utf-8"))
            """获取 秒杀活动成功抢购 的数量,需要转码与转换 int 数据类型"""
            num = int(pipline.get("kill_num").decode("utf-8"))
            """
            当 秒杀活动成功抢购 的数量 小于 秒杀商品 的总数时[num < tota],秒杀活动还未结束,可以继续参与秒杀;
            当 有效时,将 kill_num 的数量加 1 ,并将 user_id 写入 kill_user 记录 
            """
            if num < total:
                """利用 pipline 的 multi() 函数,开启 redis 的事务"""
                pipline.multi()
                """利用 pipline 的 incr() 函数,将 'kill_num' 记录 +1 """
                pipline.incr("kill_num")
                """利用 s 集合 的 pop() 函数删除并获取 random 随机生成的 用户id ,赋值给 user_id 变量"""
                user_id = s.pop()
                """利用 pipline 的 rpush() 函数,将秒杀到商品的用户的 'user_id' 传入给 kill_user记录 """
                pipline.rpush("kill_user", user_id)
                """利用 pipline 的 execute() 函数,提交 redis 的事务"""
                pipline.execute()
    except Exception as error:
        print(error)
    finally:
        """判断 pipline 是否存在,存在的话,回收 pipline 事务"""
        if "pipline" in dir():
            pipline.reset()
        del connection
"""
利用 for 循环,模拟 1000 名用户参与抢购秒杀活动;
将 bug 函数作为线程池任务函数,传给线程池来执行。
"""
for i in range(0, 1000):
    executor.submit(buy)
print("秒杀已经结束")


c739708d594e4bb48f21ff16a005a565.png



6ef798f702bc42e6bb28c8ac4a4fa3ff.png



















相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5天前
|
API 数据库 数据安全/隐私保护
Flask框架在Python面试中的应用与实战
【4月更文挑战第18天】Django REST framework (DRF) 是用于构建Web API的强力工具,尤其适合Django应用。本文深入讨论DRF面试常见问题,包括视图、序列化、路由、权限控制、分页过滤排序及错误处理。同时,强调了易错点如序列化器验证、权限认证配置、API版本管理、性能优化和响应格式统一,并提供实战代码示例。了解这些知识点有助于在Python面试中展现优秀的Web服务开发能力。
22 1
|
1天前
|
缓存 NoSQL 关系型数据库
在Python Web开发过程中:数据库与缓存,MySQL和NoSQL数据库的主要差异是什么?
MySQL与NoSQL的主要区别在于数据结构、查询语言和可扩展性。MySQL是关系型数据库,依赖预定义的数据表结构,使用SQL进行复杂查询,适合垂直扩展。而NoSQL提供灵活的存储方式(如JSON、哈希表),无统一查询语言,支持横向扩展,适用于处理大规模、非结构化数据和高并发场景。选择哪种取决于应用需求、数据模型及扩展策略。
10 0
|
2天前
|
SQL 关系型数据库 MySQL
第十三章 Python数据库编程
第十三章 Python数据库编程
|
2天前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
|
3天前
|
数据采集 存储 人工智能
【Python+微信】【企业微信开发入坑指北】4. 企业微信接入GPT,只需一个URL,自动获取文章总结
【Python+微信】【企业微信开发入坑指北】4. 企业微信接入GPT,只需一个URL,自动获取文章总结
13 0
|
3天前
|
人工智能 机器人 API
【Python+微信】【企业微信开发入坑指北】3. 如何利用企业微信API给微信群推送消息
【Python+微信】【企业微信开发入坑指北】3. 如何利用企业微信API给微信群推送消息
6 0
|
3天前
|
缓存 人工智能 API
【Python+微信】【企业微信开发入坑指北】2. 如何利用企业微信API主动给用户发应用消息
【Python+微信】【企业微信开发入坑指北】2. 如何利用企业微信API主动给用户发应用消息
7 0
|
3天前
|
人工智能 Python
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
8 0
|
5天前
|
SQL 中间件 API
Flask框架在Python面试中的应用与实战
【4月更文挑战第18天】**Flask是Python的轻量级Web框架,以其简洁API和强大扩展性受欢迎。本文深入探讨了面试中关于Flask的常见问题,包括路由、Jinja2模板、数据库操作、中间件和错误处理。同时,提到了易错点,如路由冲突、模板安全、SQL注入,以及请求上下文管理。通过实例代码展示了如何创建和管理数据库、使用表单以及处理请求。掌握这些知识将有助于在面试中展现Flask技能。**
12 1
Flask框架在Python面试中的应用与实战
|
NoSQL Redis Python
Redis与Python进行交互
安装包 安装Redis的有3种方式https://github.com/andymccurdy/redis-py 第一种:进⼊虚拟环境,联⽹安装包redis pip install redis 第二种:进⼊虚拟环境,联⽹安装包redis easy_install redis 第三种:到...
1543 0

热门文章

最新文章