并发测试框架本来已经在上一篇文章《常见的并发问题》已经写过,但是,有的小朋友必须要我把框架部分 单独拎出来。
好吧~ ~
我直接把代码拎出来:
# -*- coding: utf-8 -*- """ @ auth : carl_DJ @ time : 2020-6-9 """ import threading from datetime import * #自定义全局变量需要的线程数,20 thread_num = 20 #自定义全局变量每个线程需要循环的数量,10 one_work_num = 10 #自定义函数test() def test(): #编写测试代码 now = datetime.now() print("打印时间戳:",now) #设置死循环 #x =0 #while (x ==1): # print("打印时间戳:",now) def working(): # 引用全局变量 global one_work_num #嵌套执行指定循环次数的 test()函数 for i in range(0,one_work_num): test() #自定义thr()函数,来执行多线程 def thr(): #自定义一个空的数组,用来存放线程组 threads = [] #设置循环次数为thread_num for i in range(thread_num): #将working()函数放入到线程中 t =threading.Thread(target=working,name="T" + str(i)) #设置守护线程 t.setDaemon(True) threads.append(t) #启动线程并执行 for t in threads: t.start() #设置阻塞线程 for t in threads: t.join(2) if __name__ == "__main__": thr()
很简洁的一段代码,没有什么难度。
俗话说好记性不如烂键盘。
所以,
多撸码,多练习,敲烂键盘有必要;
少操心,少开车,注重身体很重要。