今天的这一章节我们将来实现 “模拟商品秒杀活动” 的案例,在这个案例中将使用到多线程技术,相较于上一章节的 “缓存观众投票数据信息” 的案例。在技术难点山又提升了一个阶层,Python 自带的线程池技术免去了我们自己创建和管理线程的麻烦,只需要编写好线程需要执行的任务,将任务交给线程池运行即可
为什么要引入线程池技术
我们使用的数据库连接池技术是为了免去反复创建和销毁连接,造成硬件资源的不必要浪费。
线程池技术的设计考量也是如此,如果程序中经常需要用到线程, 频繁的创建和销毁线程对象,会极大的浪费很多硬件资源(线程对象不像普通的对象,在创建线程的时候需要分配一定的内存,还需要调度线程去启动执行等等... 这个过程是很麻烦的) 。
我们可以把线程和任务分离开利用线程池缓存一些空闲的线程(空闲线程的数量是可以自定义的), 如此线程可以反复利用,省去了重复创建的麻烦。
当需要线程去执行某一个任务的时候,就可以将那个这个任务写成一个函数传递给线程池,线程池就会挑选一个空闲的线程去执行这个任务。任务执行结束之后,分配出去执行任务的线程就会被回收,也就避免了现成的反复创建和销毁。 这个原理与数据库连接池技术比较相像。
通过案例加深线程池技术原理的理解
- 接下来我们通过一个简单的案例了解一下 Python 程序中的 线程池应该如何去使用。案例如下:
# coding:utf-8 from concurrent.futures import ThreadPoolExecutor # 导入 concurrent.futures 模块的 线程池类 ThreadPoolExecutor def say_hello(): # 定义线程任务函数,只执行打印 "Hello" 的操作 print("Hello") executor = ThreadPoolExecutor(50) # 利用 ThreadPoolExecutor 构造线程池对象,传入 缓存的线程池数量 50 for i in range(0, 10): # 利用 for 循环与 executor 的 submit 函数传入 线程任务函数 - say_ hello hello = executor.submit(say_hello) print(hello)
线程池这个例子非常的简单,接下来我们将要实现的 “抢购秒杀的活动”,就会利用线程池模拟很多用户来抢购商品
实现多线程模拟商品秒杀案例 - 思路
案例需求:
利用Python多线程模拟商品秒杀过程,不可以出现超买和超卖的情况。
假设A商品有50件参与秒杀活动,10分钟秒杀自动结束。
实现思路:
以 1000 人为例,模拟1000人去抢购 50 件商品。如果使用 for 循环去执行 1000 次,那就不是抢购了,而是排队购买东西。
因为只有线程是并发执行的,所以只有多线程才能够模拟并发的抢购,如此这种秒杀的盛况才能够出现,这也是我们之所在讲解案例之前介绍线程池的主要原因。
线程池缓存线程的数量为多少合适呢?1000 人抢购,由于网络延迟、操作APP、操作浏览器的因素,可能每秒钟只有几百人参与秒杀。那么创建一个缓存了 200 数量线程的线程池即可,可以模拟每秒钟最多 200 个用户同时抢购。
redis 采用的是单线程机制,所以也不可能会出现超买和超卖的问题;如果使用 MySQL 的话,就可能会存在这种情况。
既然这个案例需要使用 redis 来完成,那就需要分析一下,redis 里需要保存的记录都有哪些?如下:
kill_total # 商品总数;(字符串类型) kill_num # 秒杀成功的数量; kill_flag # 当秒杀数量为 50 ,或者时间满足 10 分钟 的时候,就删除掉 kill_flag,表示秒杀活动结束。 kill_user # 记录秒杀成功的用户id,使用 列表 进行记录。(需要注意的是用户id是不可以重复的)
- 实现多线程模拟商品秒杀案例 - 代码
# coding:utf-8 import redis import random from redis_db import redis_Pool # 导入 redis 连接池 from concurrent.futures import ThreadPoolExecutor # 导入 concurrent.futures 模块的 线程池类 ThreadPoolExecutor """ 生成随机用户的id """ s = set() # 设置空集合,将其赋值给变量 s """ 定义一个死循环,向集合中添加 1000个 不重复的用户id; 因为使用的是 random 模块,不能保证生成的用户 id 是不重复的,所以 while 循环是不能够循环 1000 次的; 所以 while 循环的结束条件是 s 的长度为 1000 的时候,小于 1000 的情况下,是需要继续向 s 中添加 用户id 的 """ while True: if len(s) == 1000: break num = random.randint(10000, 100000) s.add(num) # 将随机生成的 用户id 添加到 s 中去 """ 获得 redis 连接 """ con = redis.Redis( connection_pool=redis_Pool ) """ 利用 try...except...finally 捕获异常 1、需要判断 redis 中是否包含有秒杀用到的数据,这段程序在调试的过程中肯定会运行不止一次,如果已存在秒杀数据,势必会影响下一次运行。 2、如果第一次运行的结果,秒杀成功的记录没有被删除,第二次再次运行的时候又会将秒杀成功的记录再次添加,这就非常不合理了。 3、所以每一次运行脚本之前,就需要将与 秒杀案例 相关的 redis 的记录全部删掉 """ try: con.delete("kill_total", "kill_num", "kill_flag", "kill_user") # 删掉 redis 的秒杀案例相关的记录 con.set("kill_total", 50) # 写入 秒杀商品总数量 kill_total,数量 50 con.set("kill_num", 0) # 写入 秒杀活动成功抢购的数量 kill_num,初始数量 0 con.set("kill_flag", 1) # 写入 秒杀活动 的状态 kill_num,参数值 1 ;这里给多少随意,只是用来判断活动的标识。 con.expire("kill_flag", 600) # 写入 秒杀活动 的状态的过期时间,十分钟,600秒 """kill_user 不用创建,在后面用到的时候像 kill_user 添加记录时会自动创建""" except Exception as e: # 捕获异常并打印输出 print(e) finally: # 回收 redis 连接 del con """ 定义线程池,传入 缓存的线程池数量 200 """ executor = ThreadPoolExecutor(200) """ 创建线程池任务 - 秒杀 1、参与秒杀时,需要重点关注秒杀的数据 2、利用 redis 事务去执行 秒杀任务 数据的记录 - pipeline() """ def buy(): """获取 redis 链接(区别于第 31 行的 con 连接);创建 redis 事务 - pipeline()""" connection = redis.Redis( connection_pool=redis_Pool ) """定义 pipline """ pipline = connection.pipeline() """利用 try...except...fi nally 捕获异常;并最终回收链接。""" try: """通过 connection 判断 '秒杀' 是否处于有效状态;如果 'kill_flag' 为 0 的话,则秒杀任务就不存在,没有任何实际意义""" if connection.exists("kill_flag") == 1: """在秒杀活动生效的情况下,观察 成功抢购数 与 成功抢购的用户ID 的两个数据""" pipline.watch("kill_num", "kill_user") """获取 秒杀商品 的总数,需要转码与转换 int 数据类型""" total = int(pipline.get("kill_total").decode("utf-8")) """获取 秒杀活动成功抢购 的数量,需要转码与转换 int 数据类型""" num = int(pipline.get("kill_num").decode("utf-8")) """ 当 秒杀活动成功抢购 的数量 小于 秒杀商品 的总数时[num < tota],秒杀活动还未结束,可以继续参与秒杀; 当 有效时,将 kill_num 的数量加 1 ,并将 user_id 写入 kill_user 记录 """ if num < total: """利用 pipline 的 multi() 函数,开启 redis 的事务""" pipline.multi() """利用 pipline 的 incr() 函数,将 'kill_num' 记录 +1 """ pipline.incr("kill_num") """利用 s 集合 的 pop() 函数删除并获取 random 随机生成的 用户id ,赋值给 user_id 变量""" user_id = s.pop() """利用 pipline 的 rpush() 函数,将秒杀到商品的用户的 'user_id' 传入给 kill_user记录 """ pipline.rpush("kill_user", user_id) """利用 pipline 的 execute() 函数,提交 redis 的事务""" pipline.execute() except Exception as error: print(error) finally: """判断 pipline 是否存在,存在的话,回收 pipline 事务""" if "pipline" in dir(): pipline.reset() del connection """ 利用 for 循环,模拟 1000 名用户参与抢购秒杀活动; 将 bug 函数作为线程池任务函数,传给线程池来执行。 """ for i in range(0, 1000): executor.submit(buy) print("秒杀已经结束")