小屌丝:鱼哥,我想写一个接口订单并发性能,能不能给我讲一下
小鱼:接口订单并发?我前篇文章不是写过常见并发框架
,然后你在追加一个创建订单和生成订单不就可以了?
小屌丝:鱼哥,你说的可轻松,那你能不能来一个?
小鱼:好吧,那我就以我某个项目为例,我们实际的看一下,都需要哪些步骤。
小屌丝: 鱼哥,就你这一点,最招人稀罕。哈哈!
小鱼:挖草了~~
那么我们就来分析一下,订单并发性能,我们想要什么:
>>1.订单并发数
>>2.成功订单数
>>3.订单成功率
>>4.成功订单总响应时间
>>5.成功订单平均响应时间
>>6.TPS
有了上面我们想要的,那么我们就来分析如何获取这些信息:
并发订单数:即自定义的并发数,根据我们的想法,把并发200次,设置为20个线程,每个循环10次
成功订单数:就是获取响应值为成功的请求,先定义一个success_count ,初始值为0,每成功一次,增加1
订单成功率:成功订单数/总的订单数
成功订单总响应时间:每个成功订单的响应时间之和,所以我们定义一个sum_time,初始值为0.00,然后把每次成功的响应时间加起来
成功订单平均响应时间:成功订单总响应时间/成功订单数
TPS:成功并发数/成功订单平均响应时间
订单响应时间:在请求之前,获取一次时间,在断言成功之后,再次获取一次时间,这样二者之差,就是订单的响应时间。
了解了思路之后,我们就看实际源码:
# -*- coding: utf-8 -*- """ @ auth : carl_DJ @ time : 2020-6-11 """ import hashlib import threading from time import * from datetime import datetime,timedelta import requests import json '''初始化全局变量''' #自定义全局变量需要的线程数,20 thread_num = 20 #自定义全局变量每个线程需要循环的数量,10 one_worker_num = 10 #设定最开始的总时间 sum_time = 0.00 #设定最开始的成功连接数 success_count = 0 ''' 后台登录常规操作''' username = '13388889999' password = hashlib.md5(b'123456').hexdigest() #设置密码,且是md5加密方式 url = "http://www.xxx.com/energy/user/login/" form_data = {"username":username,"password":password} login_response = requests.post(url,data=form_data) c = login_response.cookies '''订单发送请求''' def order(): #引用全局变量 global c global sum_time global success_count #获取执行发送订单请求前时间 t1 = time() #设定url、form_data进行创建订单 url1 = "http://www.xxx.com/energy/create_order/" from_data1 = {"restaurant_id":1136, "menu_item_total":'12.00', "menu_item_data": [{'id':2667868,'p':22,'q':3}] } make_responst = requests.post(url1,data=from_data1,cookies = c) #获取请求结果 res = make_responst.text #结果转换成字典赋值给变量id id = json.loads(res)['order_id'] #断言判断是否提交成功 assert id != " " su_time =datetime.now()+ timedelta(hours=1) #设定url、form_data进行生成订单 url2 = "http://www.xxx.com/energy/place_order/" from_data2 = {"restaurant_id": id, "customer_name": 'carl_dj', "mobile_number":username, "delivery_address":"address message", "pay_type":'cash', "preorder":su_time } place_responst = requests.post(url2, data=from_data2, cookies=c) res = place_responst.text #追加断言,判断结果是否有"success",有的话,说明订餐成功 assert res == " success" print("订餐成功") #订单成功后,再次获取一下时间 t2 = time() #获取订单的响应时间 res_time = t2-t1 #把响应时间写入txt文件 result = open("E:\Private Folder\res.txt","a") #路径直接写死,也可用os.path 来写路径 result.write("成功订单响应时间:" + str(res_time)+ '\n') result.close() #也可以使用with打开文件,好处是不用关心文件是否关闭 # with open ("E:\Private Folder\res.txt","a") as result1: # print(result1.read()) #把每次成功订单数累加到全局变量sum_time中 sum_time = sum_time + res_time #把每次获取的成功订单数做累加,添加到全局变量success_count中 success_count = success_count +1 '''嵌套指定循环次数的order()函数''' def working() global one_worker_num for i in range(0,one_worker_num): order() '''自定义main()函数,来执行多线程''' def main(): global thread_num #自定义一个空的数组,用来存放线程组 threads = [] #设置循环次数 for i in range(thread_num): #将working()函数存放到线程中 t = threading.Thread(target=working,name="T"+ str(i)) #设定守护线程 t.setDaemon(True) threads.append(t) #启动循环执行 for t in threads: t.start() ##设置阻塞线程 for t in threads: t.join() if __name__ == "__main__": main() total_order = thread_num*one_worker_num avg_time = sum_time/success_count '''执行完之后,需要把数据写入到txt文件中''' #订单并发总数 result.write("并发订单数:"+ str(total_order)+ "\n") #成功并发数 result.write("成功并发数:"+ str(success_count) + "\n") #订单成功率 result.write("订单成功率:"+ str(success_count/total_order*100)+ "%" + "\n") #成功订单响应时间 result.write("成功订单总响应时间:"+ str(sum_time)+"\n") #成功订单平均响应时间 result.write("成功平均响应时间:"+str(sum_time/success_count)+"\n") #TPS事务数/秒 result.write("TPS:"+str(success_count/avg_time) + "\n") #tps = 并发成功数/平均响应时间 result.close()
注:
1.这里运用到了str(),
>>是因为响应时间是数字,而写入文件的时候是字符串类型,所以需要把最后的数字通过str()函数进行转化。
2.这里的文件路径是直接写死的,并没有使用os.path获取。
3.打开文件的方式 :open () 或者with open() 都可以,这里两种方法都写了。
>使用open()方法,最后别忘了close(),不然消耗资源…