在深度学习学习中,一般模型的训练和模型部署,都是单模型单卡实现的,如果在业务中同一时间传入到模型的数据很多,一时间模型处理数据预测,通常来说就是一个接一个处理,第一个数据处理完预测,下一个数据进来(队列的形式),
这样的部署,在业务上预测延迟很大的。
在深度学习多进程GPU的部署方法是使用FastAPI,先把程序封装成服务API接口,在用uvicorn实现多进程调用,每一个进程在一个GPU上运行。
再了解如何用深度学习部署多进程之前,先学习一下,python中多进程多线程是如何实现以及原理的
python多进程多线程
如何理解进程和线程的关系呢,以显卡为例子,这是名人堂3080Ti显卡,16线程。显卡在电脑中的作用就是画面显示图像渲染,在一些游戏中我们显示器看到的所有东西都是通过显卡来实现的,说白了显卡就好比画家。
游戏中每一帧画面,就好比是一个进程(画一张画),那画中的这些内容就是由画家来完成也就是线程来完成,16线程就代表有16个画家一起同时来完成这一帧的画。
所以说一个程序至少有一个进程,一个进程至少有一个线程
多线程
python中的多线程主要使用到 threading 这个库
常用的操作方法
获取已激活的线程数
import threading threading.active_count() #output:1
查看所有线程信息
threading.enumerate() #output:[<_MainThread(MainThread, started 14073601193474)>]
返回当前正在运行的线程
threading.current_thread() • 1
创建线程
import threading import time def function(arg): time.sleep(1) print('thread '+str(arg)+" running....") if __name__ == '__main__': #创建十个线程 for i in range(10): t = threading.Thread(target=function, args=(i,)) #线程启动start() t.start()
Tread函数参数如下:
threading.Thread(self, group=None, target=None, name=None,args=(),kwargs=None, *, daemon=None)
- group:用于扩展
- target:线程调用的程序
- name:设置线程的名字
- args和kwargs代表调用程序target的参数列表和关键字参数
join
线程和线程之间也可能会出现矛盾的,还是以画画为列子,二个画家同时画一幅画,有一个部分是需要画家A先上完一层颜色,在由画家B在A的基础上完成点缀的,但画家B没等画家A上色完,就已经在点缀了,导致画出来的东西不对。
这种情况,就需要我们去控制线程与线程之间的运行的前后顺序了,就用到Thread类的常用方法:join([timeout])
- join:调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。
不加join的情况
import time import threading def painting(): time.sleep(3) print('Artist A ends painting:',time.strftime('%H:%M:%S')) time.sleep(1) print('Artist B starts painting', time.strftime('%H:%M:%S')) time.sleep(3) print('Artist A starts painting',time.strftime('%H:%M:%S')) t = threading.Thread(target=painting) t.start() # 确保线程t已经启动 print('Artist B ends painting',time.strftime('%H:%M:%S'))
加join的情况
import time import threading def painting(): time.sleep(3) print('Artist A ends painting:',time.strftime('%H:%M:%S')) time.sleep(1) print('Artist B starts painting', time.strftime('%H:%M:%S')) time.sleep(3) print('Artist A starts painting',time.strftime('%H:%M:%S')) t = threading.Thread(target=painting) t.start() t.join() # 确保线程t已经启动 print('Artist B ends painting',time.strftime('%H:%M:%S'))
lock
也有可能线程之间,因为同一资料还吵架,比如二个画家同时都要用这个同一种颜料,二个人打起来了。
lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。
不使用lock
import threading def Artist_A(): global pigment for i in range(10): pigment+=1 print('Number of paints used by artist A',pigment) def Artist_B(): global pigment for i in range(10): pigment+=1 print('Number of paints used by artist B',pigment) if __name__== '__main__': pigment=0 t1=threading.Thread(target=Artist_A) t2=threading.Thread(target=Artist_B) t1.start() t2.start() t1.join() t2.join()
使用lock
import threading def Artist_A(): global pigment,lock lock.acquire() for i in range(10): pigment+=1 print('Number of paints used by artist A',pigment) lock.release() def Artist_B(): global pigment,lock lock.acquire() for i in range(10): pigment+=1 print('Number of paints used by artist B',pigment) lock.release() if __name__== '__main__': lock=threading.Lock() pigment=0 t1=threading.Thread(target=Artist_A) t2=threading.Thread(target=Artist_B) t1.start() t2.start() t1.join() t2.join()
多进程
进程与线程的使用方法类似,都有start(),join(),lock()的调用方法
导入模块
import multiprocessing m1 = multiprocessing.Process(target=function,args)
在进程中,如果我们的执行的function有return返回值时,如果存储,如何使用有二种方法:用队列和进程池
队列
队列存储获取,通过put()存储,get()获取
import multiprocessing as mp import threading as td def function(q,num): res =0 for i in range(num): res += i q.put(res) def multi(): q = mp.Queue() m1 = mp.Process(target=function,args=(q,5)) m2 = mp.Process(target=function,args=(q,5)) m1.start() m2.start() m1.join() m2.join() res1 = q.get() res2 = q.get() print('multi:', res1 + res2) multi()
进程池
进程池类似于一个机器学习黑盒子,我们把参数function的参数给黑盒子,黑盒子输出返回值。
导入模块
创建几个进程process
import multiprocessing as mp pool =mp.Pool(process=2)
常见的三种使用进程池的方法:map、apply、apply_async
map
map规定线程池执行的任务
result = pool.map(function,args) • 1
- function:执行的程序函数
- args:传入的参数
apply
apply阻塞主进程, 并且一个一个按顺序地执行子进程, 等到全部子进程都执行完毕后 ,继续执行 apply()后面主进程的代码
什么是主程序和子程序,使用进程封装起来的target封装起来的就是子程序,其余的就是主程序。
import time import multiprocessing def Painting(num): print("Number of paints used by artist", num) time.sleep(1) if __name__ == '__main__': print('Artist starts painting ') # 记录一下开始执行的时间 start_time = time.time() # 创建三个子进程 pool = multiprocessing.Pool(3) for i in range(3): pool.apply(Painting, [i]) print('Artist ends painting',time.time() - start_time) pool.close() pool.join()
apply_async
apply_async() 非阻塞异步的, 他不会等待子进程执行完毕, 主进程会继续执行, 他会根据系统调度来进行进程切换。
CPU在执行第一个子进程的时候, 还没等第一个子进程结束, 系统调度到了按顺序调度到了第二个子进程, 以此类推, 一直调度运行子进程, 一个接一个地结束子进程的运行, 最后运行主进程,
import time import multiprocessing def Painting(num): print("Number of paints used by artist", num) time.sleep(1) if __name__ == '__main__': print('Artist starts painting ') # 记录一下开始执行的时间 start_time = time.time() # 创建三个子进程 pool = multiprocessing.Pool(3) for i in range(3): pool.apply_async(Painting, [i]) print('Artist ends painting',time.time() - start_time) pool.close() pool.join()
多进程多线程对比time
具体用法看下面列子
import multiprocessing as mp import threading as td import time def function(q,num): res =0 for i in range(num): res += i q.put(res) def multi(): q = mp.Queue() m1 = mp.Process(target=function,args=(q,5)) m2 = mp.Process(target=function,args=(q,5)) m1.start() m2.start() m1.join() m2.join() res1 = q.get() res2 = q.get() print('multi:', res1 + res2) def thread(): q = mp.Queue() # thread可放入process同样的queue中 t1 = td.Thread(target=function, args=(q,5)) t2 = td.Thread(target=function, args=(q,5)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('thread:', res1 + res2) def normal(): res = 0 for _ in range(2): for i in range(5): res += i print('normal:',res) def run_time(f): start = time.time() f() end = time.time() print('running time',end-start) if __name__=="__main__": run_time(multi) run_time(thread) run_time(normal)
时间消耗顺序:普通<多线程<多进程
不同的程序任务 消耗的时间可能不同的,谁的消耗时间越短不确定的
subprocess
subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值,大白话说:我们可以通过它,在python的代码中就可以执行linux操作系统的一些命令比如说“ls -la”等。
之前的版本还有subprocess.call(),subprocess…check_call()和subprocess…check_output()这些函数,python3.5版本之后,这些函数都被subprocess.run代替了
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)
args:执行的linux命令。必须是一个字符串,字符串参数列表
stdin、stdout 和 stderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPEsubprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。
timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。
check:如果该参数设置为True,并且进程退出状态码不是 0,则弹出CalledProcessError异常。
encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。
Popen()
Popen 是 subprocess的核心,子进程的创建和管理都靠它处理。
Popen类的常用的方法有:
poll(): 检查进程是否终止,如果终止返回 returncode,否则返回 None。
wait(timeout): 等待子进程终止。
communicate(input,timeout): 和子进程交互,发送和读取数据
send_signal:发送信号到子进程 。
terminate(): 停止子进程,也就是发送SIGTERM信号到子进程。
kill(): 杀死子进程。发送SIGKILL 信号到子进程。
实例
import time import subprocess class TimeoutError(Exception): pass def command(cmd, timeout=60): """执行命令cmd,返回命令输出的内容。 如果超时将会抛出TimeoutError异常。 cmd - 要执行的命令 timeout - 最长等待时间,单位:秒 """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) t_beginning = time.time() seconds_passed = 0 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if seconds_passed > timeout: p.terminate() raise TimeoutError(cmd, timeout) time.sleep(0.1) return p.stdout.read() if __name__ == "__main__": print command(cmd='ping www.baidu.com', timeout=60)