深度学习多进程GPU部署(一)- python多进程多线程

简介: 深度学习多进程GPU部署(一)- python多进程多线程

在深度学习学习中,一般模型的训练和模型部署,都是单模型单卡实现的,如果在业务中同一时间传入到模型的数据很多,一时间模型处理数据预测,通常来说就是一个接一个处理,第一个数据处理完预测,下一个数据进来(队列的形式),


这样的部署,在业务上预测延迟很大的。


在深度学习多进程GPU的部署方法是使用FastAPI,先把程序封装成服务API接口,在用uvicorn实现多进程调用,每一个进程在一个GPU上运行。


再了解如何用深度学习部署多进程之前,先学习一下,python中多进程多线程是如何实现以及原理的


python多进程多线程


如何理解进程和线程的关系呢,以显卡为例子,这是名人堂3080Ti显卡,16线程。显卡在电脑中的作用就是画面显示图像渲染,在一些游戏中我们显示器看到的所有东西都是通过显卡来实现的,说白了显卡就好比画家。

a6667d193d4f4468ac0b0f1f997934e9.png

游戏中每一帧画面,就好比是一个进程(画一张画),那画中的这些内容就是由画家来完成也就是线程来完成,16线程就代表有16个画家一起同时来完成这一帧的画。


c50e1327e9994f95afc36362b9c49c05.png


所以说一个程序至少有一个进程,一个进程至少有一个线程


多线程


python中的多线程主要使用到 threading 这个库


常用的操作方法


获取已激活的线程数


import threading
threading.active_count()
#output:1


查看所有线程信息


threading.enumerate()
#output:[<_MainThread(MainThread, started 14073601193474)>]


返回当前正在运行的线程


threading.current_thread()
• 1


创建线程



import threading
import time
def function(arg):
    time.sleep(1)
    print('thread '+str(arg)+" running....")
if __name__ == '__main__':
  #创建十个线程
    for i in range(10):
        t = threading.Thread(target=function, args=(i,))
        #线程启动start()
        t.start()


Tread函数参数如下:


threading.Thread(self, group=None, target=None, name=None,args=(),kwargs=None, *, daemon=None)


  • group:用于扩展
  • target:线程调用的程序
  • name:设置线程的名字
  • args和kwargs代表调用程序target的参数列表和关键字参数


join


线程和线程之间也可能会出现矛盾的,还是以画画为列子,二个画家同时画一幅画,有一个部分是需要画家A先上完一层颜色,在由画家B在A的基础上完成点缀的,但画家B没等画家A上色完,就已经在点缀了,导致画出来的东西不对。


这种情况,就需要我们去控制线程与线程之间的运行的前后顺序了,就用到Thread类的常用方法:join([timeout])


  • join:调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。

不加join的情况


import time
import threading
def painting():
    time.sleep(3)
    print('Artist A ends painting:',time.strftime('%H:%M:%S'))
    time.sleep(1)
    print('Artist B starts painting', time.strftime('%H:%M:%S'))
    time.sleep(3)
print('Artist A starts painting',time.strftime('%H:%M:%S'))
t = threading.Thread(target=painting)
t.start()
# 确保线程t已经启动
print('Artist B ends painting',time.strftime('%H:%M:%S'))


0f6830a118fa47e487e63774dd24b6ad.png

加join的情况


import time
import threading
def painting():
    time.sleep(3)
    print('Artist A ends painting:',time.strftime('%H:%M:%S'))
    time.sleep(1)
    print('Artist B starts painting', time.strftime('%H:%M:%S'))
    time.sleep(3)
print('Artist A starts painting',time.strftime('%H:%M:%S'))
t = threading.Thread(target=painting)
t.start()
t.join()
# 确保线程t已经启动
print('Artist B ends painting',time.strftime('%H:%M:%S'))

2b652b30b82c469693e25d17eefdcc3e.png


lock


也有可能线程之间,因为同一资料还吵架,比如二个画家同时都要用这个同一种颜料,二个人打起来了。


lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。


不使用lock


import threading
def Artist_A():
    global pigment
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist A',pigment)
def Artist_B():
    global pigment
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist B',pigment)
if __name__== '__main__':
    pigment=0
    t1=threading.Thread(target=Artist_A)
    t2=threading.Thread(target=Artist_B)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

c3570c9e9c284640a509708d38a458e7.png

使用lock


import threading
def Artist_A():
    global pigment,lock
    lock.acquire()
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist A',pigment)
    lock.release()
def Artist_B():
    global pigment,lock
    lock.acquire()
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist B',pigment)
    lock.release()
if __name__== '__main__':
    lock=threading.Lock()
    pigment=0
    t1=threading.Thread(target=Artist_A)
    t2=threading.Thread(target=Artist_B)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

96b749a86bb24dfbbc2f5d5505ec67fd.png

多进程


进程与线程的使用方法类似,都有start(),join(),lock()的调用方法

导入模块


import multiprocessing 
m1 = multiprocessing.Process(target=function,args)


在进程中,如果我们的执行的function有return返回值时,如果存储,如何使用有二种方法:用队列和进程池


队列


队列存储获取,通过put()存储,get()获取


import multiprocessing as mp
import threading as td
def function(q,num):
    res =0
    for i in range(num):
        res += i
    q.put(res)
def multi():
    q = mp.Queue()
    m1 = mp.Process(target=function,args=(q,5))
    m2 = mp.Process(target=function,args=(q,5))
    m1.start()
    m2.start()
    m1.join()
    m2.join()
    res1 = q.get()
    res2 = q.get()
    print('multi:', res1 + res2)
multi()


进程池


进程池类似于一个机器学习黑盒子,我们把参数function的参数给黑盒子,黑盒子输出返回值。

导入模块

创建几个进程process


import multiprocessing as mp
pool =mp.Pool(process=2)


常见的三种使用进程池的方法:map、apply、apply_async


map


map规定线程池执行的任务


result = pool.map(function,args)
• 1


  • function:执行的程序函数
  • args:传入的参数


apply


apply阻塞主进程, 并且一个一个按顺序地执行子进程, 等到全部子进程都执行完毕后 ,继续执行 apply()后面主进程的代码


什么是主程序和子程序,使用进程封装起来的target封装起来的就是子程序,其余的就是主程序。


import time
import multiprocessing
def Painting(num):
    print("Number of paints used by artist", num)
    time.sleep(1)
if __name__ == '__main__':
    print('Artist starts painting ')
    # 记录一下开始执行的时间
    start_time = time.time()
    # 创建三个子进程
    pool = multiprocessing.Pool(3)
    for i in range(3):
        pool.apply(Painting, [i])
    print('Artist ends painting',time.time() - start_time)
    pool.close()
    pool.join()

219753e58e034b02988ce03f6cfb23de.png


apply_async


apply_async() 非阻塞异步的, 他不会等待子进程执行完毕, 主进程会继续执行, 他会根据系统调度来进行进程切换。


CPU在执行第一个子进程的时候, 还没等第一个子进程结束, 系统调度到了按顺序调度到了第二个子进程, 以此类推, 一直调度运行子进程, 一个接一个地结束子进程的运行, 最后运行主进程,


import time
import multiprocessing
def Painting(num):
    print("Number of paints used by artist", num)
    time.sleep(1)
if __name__ == '__main__':
    print('Artist starts painting ')
    # 记录一下开始执行的时间
    start_time = time.time()
    # 创建三个子进程
    pool = multiprocessing.Pool(3)
    for i in range(3):
        pool.apply_async(Painting, [i])
    print('Artist ends painting',time.time() - start_time)
    pool.close()
    pool.join()

44ecd0bfffec4a3c9ac7ff1827bbf134.png


多进程多线程对比time


具体用法看下面列子


import multiprocessing as mp
import threading as td
import time
def function(q,num):
    res =0
    for i in range(num):
        res += i
    q.put(res)
def multi():
    q = mp.Queue()
    m1 = mp.Process(target=function,args=(q,5))
    m2 = mp.Process(target=function,args=(q,5))
    m1.start()
    m2.start()
    m1.join()
    m2.join()
    res1 = q.get()
    res2 = q.get()
    print('multi:', res1 + res2)
def thread():
    q = mp.Queue() # thread可放入process同样的queue中
    t1 = td.Thread(target=function, args=(q,5))
    t2 = td.Thread(target=function, args=(q,5))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('thread:', res1 + res2)
def normal():
    res = 0
    for _ in range(2):
        for i in range(5):
            res += i     
    print('normal:',res)
def run_time(f):
    start = time.time()
    f()
    end = time.time()
    print('running time',end-start)
if __name__=="__main__":
    run_time(multi)
    run_time(thread)
    run_time(normal)

5b07dd51c035488ba386891a6b7ac09e.png


时间消耗顺序:普通<多线程<多进程

不同的程序任务 消耗的时间可能不同的,谁的消耗时间越短不确定的


subprocess


subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值,大白话说:我们可以通过它,在python的代码中就可以执行linux操作系统的一些命令比如说“ls -la”等。


之前的版本还有subprocess.call(),subprocess…check_call()和subprocess…check_output()这些函数,python3.5版本之后,这些函数都被subprocess.run代替了


subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)


args:执行的linux命令。必须是一个字符串,字符串参数列表

stdin、stdout 和 stderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPEsubprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。

timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。

check:如果该参数设置为True,并且进程退出状态码不是 0,则弹出CalledProcessError异常。

encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。

shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。


Popen()


Popen 是 subprocess的核心,子进程的创建和管理都靠它处理。


Popen类的常用的方法有:

poll(): 检查进程是否终止,如果终止返回 returncode,否则返回 None。

wait(timeout): 等待子进程终止。

communicate(input,timeout): 和子进程交互,发送和读取数据

send_signal:发送信号到子进程 。

terminate(): 停止子进程,也就是发送SIGTERM信号到子进程。

kill(): 杀死子进程。发送SIGKILL 信号到子进程。

实例


import time  
import subprocess  
class TimeoutError(Exception):  
    pass  
def command(cmd, timeout=60):  
    """执行命令cmd,返回命令输出的内容。 
    如果超时将会抛出TimeoutError异常。 
    cmd - 要执行的命令 
    timeout - 最长等待时间,单位:秒 
    """  
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)  
    t_beginning = time.time()  
    seconds_passed = 0  
    while True:  
        if p.poll() is not None:  
            break  
        seconds_passed = time.time() - t_beginning  
        if seconds_passed > timeout:  
            p.terminate()  
            raise TimeoutError(cmd, timeout)  
        time.sleep(0.1)  
    return p.stdout.read()  
if __name__ == "__main__":  
    print command(cmd='ping www.baidu.com', timeout=60) 
相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
7天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
19天前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
43 1
|
23天前
|
数据处理 Apache 数据库
将 Python UDF 部署到 Apache IoTDB 的详细步骤与注意事项
【10月更文挑战第21天】将 Python UDF 部署到 Apache IoTDB 中需要一系列的步骤和注意事项。通过仔细的准备、正确的部署和测试,你可以成功地将自定义的 Python UDF 应用到 Apache IoTDB 中,为数据处理和分析提供更灵活和强大的支持。在实际操作过程中,要根据具体情况进行调整和优化,以确保实现最佳的效果。还可以结合具体的代码示例和实际部署经验,进一步深入了解和掌握这一过程。
20 2
|
25天前
|
机器人 Shell Linux
【Azure Bot Service】部署Python ChatBot代码到App Service中
本文介绍了使用Python编写的ChatBot在部署到Azure App Service时遇到的问题及解决方案。主要问题是应用启动失败,错误信息为“Failed to find attribute &#39;app&#39; in &#39;app&#39;”。解决步骤包括:1) 修改`app.py`文件,添加`init_func`函数;2) 配置`config.py`,添加与Azure Bot Service认证相关的配置项;3) 设置App Service的启动命令为`python3 -m aiohttp.web -H 0.0.0.0 -P 8000 app:init_func`。
|
26天前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
30天前
|
Linux Python
【Azure Function】Python Function部署到Azure后报错No module named '_cffi_backend'
ERROR: Error: No module named '_cffi_backend', Cannot find module. Please check the requirements.txt file for the missing module.
|
1月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3
|
1月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
20 0
|
2月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
57 0
|
2月前
|
数据采集 Linux 调度
Python之多线程与多进程
Python之多线程与多进程