最近公司服务类的项目有个测试并发的需求,测试的目的并非是服务器能抗住多大的并发,而是测试服务在并发请求下是否是阻塞的。比如单个并发的执行时间大概是两秒的话,那么10个并发的总执行时间是所有请求执行时间的总和还是最大请求时间。
代码
import time
import requests
from multiprocessing import Process
from multiprocessing import Pool
data = {
"times": 10, # 并发量
"method": "POST",
"url": "http://xxx.com/xxx",
"header": {
"Content-Type": "application/json",
"user-agent": "python-mock/0.0.1"
},
"body": {
# 参数
}
}
def run_task(idx):
response = requests.post(data["url"], json=data["body"], headers=data["header"])
if response.status_code == 200:
result = response.content.decode('utf-8')
else:
result = "访问失败"
print("第 %s 次执行:%s \n" % (idx, result))
if __name__ == '__main__':
p = Pool(data["times"])
for index in range(data["times"]):
p.apply_async(run_task, args=(index + 1,))
p.close()
p.join()
print("执行结束.")
实现原理是利用python的多进程,测试10个并发量就利用for循环创建10个进程。 p.join()
是阻塞等待所有进程执行结束。