深度学习多进程GPU部署(一)- python多进程多线程

简介: 深度学习多进程GPU部署(一)- python多进程多线程

在深度学习学习中,一般模型的训练和模型部署,都是单模型单卡实现的,如果在业务中同一时间传入到模型的数据很多,一时间模型处理数据预测,通常来说就是一个接一个处理,第一个数据处理完预测,下一个数据进来(队列的形式),


这样的部署,在业务上预测延迟很大的。


在深度学习多进程GPU的部署方法是使用FastAPI,先把程序封装成服务API接口,在用uvicorn实现多进程调用,每一个进程在一个GPU上运行。


再了解如何用深度学习部署多进程之前,先学习一下,python中多进程多线程是如何实现以及原理的


python多进程多线程


如何理解进程和线程的关系呢,以显卡为例子,这是名人堂3080Ti显卡,16线程。显卡在电脑中的作用就是画面显示图像渲染,在一些游戏中我们显示器看到的所有东西都是通过显卡来实现的,说白了显卡就好比画家。

a6667d193d4f4468ac0b0f1f997934e9.png

游戏中每一帧画面,就好比是一个进程(画一张画),那画中的这些内容就是由画家来完成也就是线程来完成,16线程就代表有16个画家一起同时来完成这一帧的画。


c50e1327e9994f95afc36362b9c49c05.png


所以说一个程序至少有一个进程,一个进程至少有一个线程


多线程


python中的多线程主要使用到 threading 这个库


常用的操作方法


获取已激活的线程数


import threading
threading.active_count()
#output:1


查看所有线程信息


threading.enumerate()
#output:[<_MainThread(MainThread, started 14073601193474)>]


返回当前正在运行的线程


threading.current_thread()
• 1


创建线程



import threading
import time
def function(arg):
    time.sleep(1)
    print('thread '+str(arg)+" running....")
if __name__ == '__main__':
  #创建十个线程
    for i in range(10):
        t = threading.Thread(target=function, args=(i,))
        #线程启动start()
        t.start()


Tread函数参数如下:


threading.Thread(self, group=None, target=None, name=None,args=(),kwargs=None, *, daemon=None)


  • group:用于扩展
  • target:线程调用的程序
  • name:设置线程的名字
  • args和kwargs代表调用程序target的参数列表和关键字参数


join


线程和线程之间也可能会出现矛盾的,还是以画画为列子,二个画家同时画一幅画,有一个部分是需要画家A先上完一层颜色,在由画家B在A的基础上完成点缀的,但画家B没等画家A上色完,就已经在点缀了,导致画出来的东西不对。


这种情况,就需要我们去控制线程与线程之间的运行的前后顺序了,就用到Thread类的常用方法:join([timeout])


  • join:调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。

不加join的情况


import time
import threading
def painting():
    time.sleep(3)
    print('Artist A ends painting:',time.strftime('%H:%M:%S'))
    time.sleep(1)
    print('Artist B starts painting', time.strftime('%H:%M:%S'))
    time.sleep(3)
print('Artist A starts painting',time.strftime('%H:%M:%S'))
t = threading.Thread(target=painting)
t.start()
# 确保线程t已经启动
print('Artist B ends painting',time.strftime('%H:%M:%S'))


0f6830a118fa47e487e63774dd24b6ad.png

加join的情况


import time
import threading
def painting():
    time.sleep(3)
    print('Artist A ends painting:',time.strftime('%H:%M:%S'))
    time.sleep(1)
    print('Artist B starts painting', time.strftime('%H:%M:%S'))
    time.sleep(3)
print('Artist A starts painting',time.strftime('%H:%M:%S'))
t = threading.Thread(target=painting)
t.start()
t.join()
# 确保线程t已经启动
print('Artist B ends painting',time.strftime('%H:%M:%S'))

2b652b30b82c469693e25d17eefdcc3e.png


lock


也有可能线程之间,因为同一资料还吵架,比如二个画家同时都要用这个同一种颜料,二个人打起来了。


lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。


不使用lock


import threading
def Artist_A():
    global pigment
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist A',pigment)
def Artist_B():
    global pigment
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist B',pigment)
if __name__== '__main__':
    pigment=0
    t1=threading.Thread(target=Artist_A)
    t2=threading.Thread(target=Artist_B)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

c3570c9e9c284640a509708d38a458e7.png

使用lock


import threading
def Artist_A():
    global pigment,lock
    lock.acquire()
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist A',pigment)
    lock.release()
def Artist_B():
    global pigment,lock
    lock.acquire()
    for i in range(10):
        pigment+=1
        print('Number of paints used by artist B',pigment)
    lock.release()
if __name__== '__main__':
    lock=threading.Lock()
    pigment=0
    t1=threading.Thread(target=Artist_A)
    t2=threading.Thread(target=Artist_B)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

96b749a86bb24dfbbc2f5d5505ec67fd.png

多进程


进程与线程的使用方法类似,都有start(),join(),lock()的调用方法

导入模块


import multiprocessing 
m1 = multiprocessing.Process(target=function,args)


在进程中,如果我们的执行的function有return返回值时,如果存储,如何使用有二种方法:用队列和进程池


队列


队列存储获取,通过put()存储,get()获取


import multiprocessing as mp
import threading as td
def function(q,num):
    res =0
    for i in range(num):
        res += i
    q.put(res)
def multi():
    q = mp.Queue()
    m1 = mp.Process(target=function,args=(q,5))
    m2 = mp.Process(target=function,args=(q,5))
    m1.start()
    m2.start()
    m1.join()
    m2.join()
    res1 = q.get()
    res2 = q.get()
    print('multi:', res1 + res2)
multi()


进程池


进程池类似于一个机器学习黑盒子,我们把参数function的参数给黑盒子,黑盒子输出返回值。

导入模块

创建几个进程process


import multiprocessing as mp
pool =mp.Pool(process=2)


常见的三种使用进程池的方法:map、apply、apply_async


map


map规定线程池执行的任务


result = pool.map(function,args)
• 1


  • function:执行的程序函数
  • args:传入的参数


apply


apply阻塞主进程, 并且一个一个按顺序地执行子进程, 等到全部子进程都执行完毕后 ,继续执行 apply()后面主进程的代码


什么是主程序和子程序,使用进程封装起来的target封装起来的就是子程序,其余的就是主程序。


import time
import multiprocessing
def Painting(num):
    print("Number of paints used by artist", num)
    time.sleep(1)
if __name__ == '__main__':
    print('Artist starts painting ')
    # 记录一下开始执行的时间
    start_time = time.time()
    # 创建三个子进程
    pool = multiprocessing.Pool(3)
    for i in range(3):
        pool.apply(Painting, [i])
    print('Artist ends painting',time.time() - start_time)
    pool.close()
    pool.join()

219753e58e034b02988ce03f6cfb23de.png


apply_async


apply_async() 非阻塞异步的, 他不会等待子进程执行完毕, 主进程会继续执行, 他会根据系统调度来进行进程切换。


CPU在执行第一个子进程的时候, 还没等第一个子进程结束, 系统调度到了按顺序调度到了第二个子进程, 以此类推, 一直调度运行子进程, 一个接一个地结束子进程的运行, 最后运行主进程,


import time
import multiprocessing
def Painting(num):
    print("Number of paints used by artist", num)
    time.sleep(1)
if __name__ == '__main__':
    print('Artist starts painting ')
    # 记录一下开始执行的时间
    start_time = time.time()
    # 创建三个子进程
    pool = multiprocessing.Pool(3)
    for i in range(3):
        pool.apply_async(Painting, [i])
    print('Artist ends painting',time.time() - start_time)
    pool.close()
    pool.join()

44ecd0bfffec4a3c9ac7ff1827bbf134.png


多进程多线程对比time


具体用法看下面列子


import multiprocessing as mp
import threading as td
import time
def function(q,num):
    res =0
    for i in range(num):
        res += i
    q.put(res)
def multi():
    q = mp.Queue()
    m1 = mp.Process(target=function,args=(q,5))
    m2 = mp.Process(target=function,args=(q,5))
    m1.start()
    m2.start()
    m1.join()
    m2.join()
    res1 = q.get()
    res2 = q.get()
    print('multi:', res1 + res2)
def thread():
    q = mp.Queue() # thread可放入process同样的queue中
    t1 = td.Thread(target=function, args=(q,5))
    t2 = td.Thread(target=function, args=(q,5))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('thread:', res1 + res2)
def normal():
    res = 0
    for _ in range(2):
        for i in range(5):
            res += i     
    print('normal:',res)
def run_time(f):
    start = time.time()
    f()
    end = time.time()
    print('running time',end-start)
if __name__=="__main__":
    run_time(multi)
    run_time(thread)
    run_time(normal)

5b07dd51c035488ba386891a6b7ac09e.png


时间消耗顺序:普通<多线程<多进程

不同的程序任务 消耗的时间可能不同的,谁的消耗时间越短不确定的


subprocess


subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值,大白话说:我们可以通过它,在python的代码中就可以执行linux操作系统的一些命令比如说“ls -la”等。


之前的版本还有subprocess.call(),subprocess…check_call()和subprocess…check_output()这些函数,python3.5版本之后,这些函数都被subprocess.run代替了


subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)


args:执行的linux命令。必须是一个字符串,字符串参数列表

stdin、stdout 和 stderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPEsubprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。

timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。

check:如果该参数设置为True,并且进程退出状态码不是 0,则弹出CalledProcessError异常。

encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。

shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。


Popen()


Popen 是 subprocess的核心,子进程的创建和管理都靠它处理。


Popen类的常用的方法有:

poll(): 检查进程是否终止,如果终止返回 returncode,否则返回 None。

wait(timeout): 等待子进程终止。

communicate(input,timeout): 和子进程交互,发送和读取数据

send_signal:发送信号到子进程 。

terminate(): 停止子进程,也就是发送SIGTERM信号到子进程。

kill(): 杀死子进程。发送SIGKILL 信号到子进程。

实例


import time  
import subprocess  
class TimeoutError(Exception):  
    pass  
def command(cmd, timeout=60):  
    """执行命令cmd,返回命令输出的内容。 
    如果超时将会抛出TimeoutError异常。 
    cmd - 要执行的命令 
    timeout - 最长等待时间,单位:秒 
    """  
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)  
    t_beginning = time.time()  
    seconds_passed = 0  
    while True:  
        if p.poll() is not None:  
            break  
        seconds_passed = time.time() - t_beginning  
        if seconds_passed > timeout:  
            p.terminate()  
            raise TimeoutError(cmd, timeout)  
        time.sleep(0.1)  
    return p.stdout.read()  
if __name__ == "__main__":  
    print command(cmd='ping www.baidu.com', timeout=60) 
相关实践学习
在云上部署ChatGLM2-6B大模型(GPU版)
ChatGLM2-6B是由智谱AI及清华KEG实验室于2023年6月发布的中英双语对话开源大模型。通过本实验,可以学习如何配置AIGC开发环境,如何部署ChatGLM2-6B大模型。
相关文章
|
1月前
|
Java 测试技术 API
【JUC】(1)带你重新认识进程与线程!!让你深层次了解线程运行的睡眠与打断!!
JUC是什么?你可以说它就是研究Java方面的并发过程。本篇是JUC专栏的第一章!带你了解并行与并发、线程与程序、线程的启动与休眠、打断和等待!全是干货!快快快!
377 2
|
1月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
127 1
|
1月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
144 1
|
7月前
|
Kubernetes 安全 异构计算
K8S 部署 Deepseek 要 3 天?别逗了!Ollama+GPU Operator 1 小时搞定
最近一年我都在依赖大模型辅助工作,比如 DeepSeek、豆包、Qwen等等。线上大模型确实方便,敲几个字就能生成文案、写代码、做表格,极大提高了效率。但对于企业来说:公司内部数据敏感、使用外部大模型会有数据泄露的风险。
K8S 部署 Deepseek 要 3 天?别逗了!Ollama+GPU Operator 1 小时搞定
|
6月前
|
人工智能 并行计算 开发者
CUDA重大更新:原生Python可直接编写高性能GPU程序
NVIDIA在2025年GTC大会上宣布CUDA并行计算平台正式支持原生Python编程,消除了Python开发者进入GPU加速领域的技术壁垒。这一突破通过重新设计CUDA开发模型,引入CUDA Core、cuPyNumeric、NVMath Python等核心组件,实现了Python与GPU加速的深度集成。开发者可直接用Python语法进行高性能并行计算,显著降低门槛,扩展CUDA生态,推动人工智能、科学计算等领域创新。此更新标志着CUDA向更包容的语言生态系统转型,未来还将支持Rust、Julia等语言。
458 3
CUDA重大更新:原生Python可直接编写高性能GPU程序
|
7月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
280 67
|
6月前
|
人工智能 并行计算 监控
在AMD GPU上部署AI大模型:从ROCm环境搭建到Ollama本地推理实战指南
本文详细介绍了在AMD硬件上构建大型语言模型(LLM)推理环境的全流程。以RX 7900XT为例,通过配置ROCm平台、部署Ollama及Open WebUI,实现高效本地化AI推理。尽管面临技术挑战,但凭借高性价比(如700欧元的RX 7900XT性能接近2200欧元的RTX 5090),AMD方案成为经济实用的选择。测试显示,不同规模模型的推理速度从9到74 tokens/秒不等,满足交互需求。随着ROCm不断完善,AMD生态将推动AI硬件多元化发展,为个人与小型组织提供低成本、低依赖的AI实践路径。
2303 1
在AMD GPU上部署AI大模型:从ROCm环境搭建到Ollama本地推理实战指南
|
5月前
|
调度 开发工具 Android开发
【HarmonyOS Next】鸿蒙应用进程和线程详解
进程的定义: 进程是系统进行资源分配的基本单位,是操作系统结构的基础。 在鸿蒙系统中,一个应用下会有三类进程:
190 0
|
8月前
|
存储 人工智能 固态存储
轻量级AI革命:无需GPU就能运算的DeepSeek-R1-1.5B模型及其低配部署指南
随着AI技术发展,大语言模型成为产业智能化的关键工具。DeepSeek系列模型以其创新架构和高效性能备受关注,其中R1-1.5B作为参数量最小的版本,适合资源受限场景。其部署仅需4核CPU、8GB RAM及15GB SSD,适用于移动对话、智能助手等任务。相比参数更大的R1-35B与R1-67B+,R1-1.5B成本低、效率高,支持数学计算、代码生成等多领域应用,是个人开发者和初创企业的理想选择。未来,DeepSeek有望推出更多小型化模型,拓展低资源设备的AI生态。
1603 8
|
8月前
|
SQL 监控 网络协议
YashanDB进程线程体系
YashanDB进程线程体系

热门文章

最新文章

推荐镜像

更多