PyTorch中的多进程并行处理

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 这篇文章我们将介绍如何利用torch.multiprocessing模块,在PyTorch中实现高效的多进程处理。

PyTorch是一个流行的深度学习框架,一般情况下使用单个GPU进行计算时是十分方便的。但是当涉及到处理大规模数据和并行处理时,需要利用多个GPU。这时PyTorch就显得不那么方便,所以这篇文章我们将介绍如何利用torch.multiprocessing模块,在PyTorch中实现高效的多进程处理。

多进程是一种允许多个进程并发运行的方法,利用多个CPU内核和GPU进行并行计算。这可以大大提高数据加载、模型训练和推理等任务的性能。PyTorch提供了torch.multiprocessing模块来解决这个问题。

导入库

 import torch
 import torch.multiprocessing as mp
 from torch import nn, optim

对于多进程的问题,我们主要要解决2方面的问题:1、数据的加载;2分布式的训练

数据加载

加载和预处理大型数据集可能是一个瓶颈。使用torch.utils.data.DataLoader和多个worker可以缓解这个问题。

 from torch.utils.data import DataLoader, Dataset
 class CustomDataset(Dataset):
     def __init__(self, data):
         self.data = data
     def __len__(self):
         return len(self.data)
     def __getitem__(self, idx):
         return self.data[idx]
 data = [i for i in range(1000)]
 dataset = CustomDataset(data)
 dataloader = DataLoader(dataset, batch_size=32, num_workers=4)
 for batch in dataloader:
     print(batch)

num_workers=4意味着四个子进程将并行加载数据。这个方法可以在单个GPU时使用,通过增加数据读取进程可以加快数据读取的速度,提高训练效率。

分布式训练

分布式训练包括将训练过程分散到多个设备上。torch.multiprocessing可以用来实现这一点。

我们一般的训练流程是这样的

 class SimpleModel(nn.Module):
     def __init__(self):
         super(SimpleModel, self).__init__()
         self.fc = nn.Linear(10, 1)
 def forward(self, x):
         return self.fc(x)
 def train(rank, model, data, target, optimizer, criterion, epochs):
     for epoch in range(epochs):
         optimizer.zero_grad()
         output = model(data)
         loss = criterion(output, target)
         loss.backward()
         optimizer.step()
         print(f"Process {rank}, Epoch {epoch}, Loss: {loss.item()}")

要修改这个流程,我们首先需要初始和共享模型

 def main():
     num_processes = 4
     data = torch.randn(100, 10)
     target = torch.randn(100, 1)
     model = SimpleModel()
     model.share_memory()  # Share the model parameters among processes
     optimizer = optim.SGD(model.parameters(), lr=0.01)
     criterion = nn.MSELoss()
     processes = []
     for rank in range(num_processes):
         p = mp.Process(target=train, args=(rank, model, data, target, optimizer, criterion, 10))
         p.start()
         processes.append(p)
     for p in processes:
         p.join()
 if __name__ == '__main__':
     main()

上面的例子中四个进程同时运行训练函数,共享模型参数。

多GPU的话则可以使用分布式数据并行(DDP)训练

对于大规模的分布式训练,PyTorch的torch.nn.parallel.DistributedDataParallel(DDP)是非常高效的。DDP可以封装模块并将其分布在多个进程和gpu上,为训练大型模型提供近线性缩放。

 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel as DDP

修改train函数初始化流程组并使用DDP包装模型。

 def train(rank, world_size, data, target, epochs):
     dist.init_process_group("gloo", rank=rank, world_size=world_size)

     model = SimpleModel().to(rank)
     ddp_model = DDP(model, device_ids=[rank])

     optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
     criterion = nn.MSELoss()

     for epoch in range(epochs):
         optimizer.zero_grad()
         output = ddp_model(data.to(rank))
         loss = criterion(output, target.to(rank))
         loss.backward()
         optimizer.step()
         print(f"Process {rank}, Epoch {epoch}, Loss: {loss.item()}")

     dist.destroy_process_group()

修改main函数增加world_size参数并调整进程初始化以传递world_size。

 def main():
     num_processes = 4
     world_size = num_processes
     data = torch.randn(100, 10)
     target = torch.randn(100, 1)
     mp.spawn(train, args=(world_size, data, target, 10), nprocs=num_processes, join=True)
 if __name__ == '__main__':
     mp.set_start_method('spawn')
     main()

这样,就可以在多个GPU上进行训练了

常见问题及解决

1、避免死锁

在脚本的开头使用mp.set_start_method('spawn')来避免死锁。

 if __name__ == '__main__':
     mp.set_start_method('spawn')
     main()

因为多线程需要自己管理资源,所以请确保清理资源,防止内存泄漏。

2、异步执行

异步执行允许进程独立并发地运行,通常用于非阻塞操作。

 def async_task(rank):
     print(f"Starting task in process {rank}")
     # Simulate some work with sleep
     torch.sleep(1)
     print(f"Ending task in process {rank}")
 def main_async():
     num_processes = 4
     processes = []

     for rank in range(num_processes):
         p = mp.Process(target=async_task, args=(rank,))
         p.start()
         processes.append(p)

     for p in processes:
         p.join()
 if __name__ == '__main__':
     main_async()

3、共享内存管理

使用共享内存允许不同的进程在不复制数据的情况下处理相同的数据,从而减少内存开销并提高性能。

 def shared_memory_task(shared_tensor, rank):
     shared_tensor[rank] = shared_tensor[rank] + rank
 def main_shared_memory():
     shared_tensor = torch.zeros(4, 4).share_memory_()
     processes = []

     for rank in range(4):
         p = mp.Process(target=shared_memory_task, args=(shared_tensor, rank))
         p.start()
         processes.append(p)

     for p in processes:
         p.join()
     print(shared_tensor)
 if __name__ == '__main__':
     main_shared_memory()

共享张量shared_tensor可以被多个进程修改

总结

PyTorch中的多线程处理可以显著提高性能,特别是在数据加载和分布式训练时使用torch.multiprocessing模块,可以有效地利用多个cpu,从而实现更快、更高效的计算。无论您是在处理大型数据集还是训练复杂模型,理解和利用多处理技术对于优化PyTorch中的性能都是必不可少的。使用分布式数据并行(DDP)进一步增强了跨多个gpu扩展训练的能力,使其成为大规模深度学习任务的强大工具。

https://avoid.overfit.cn/post/a68990d2d9d14d26a4641bbaf265671e

作者:Ali ABUSALEH

相关实践学习
基于阿里云DeepGPU实例,用AI画唯美国风少女
本实验基于阿里云DeepGPU实例,使用aiacctorch加速stable-diffusion-webui,用AI画唯美国风少女,可提升性能至高至原性能的2.6倍。
目录
相关文章
|
16天前
|
监控 Linux 应用服务中间件
探索Linux中的`ps`命令:进程监控与分析的利器
探索Linux中的`ps`命令:进程监控与分析的利器
|
2天前
|
存储 缓存 安全
【Linux】冯诺依曼体系结构与操作系统及其进程
【Linux】冯诺依曼体系结构与操作系统及其进程
10 1
|
9天前
|
小程序 Linux
【编程小实验】利用Linux fork()与文件I/O:父进程与子进程协同实现高效cp命令(前半文件与后半文件并行复制)
这个小程序是在文件IO的基础上去结合父子进程的一个使用,利用父子进程相互独立的特点实现对数据不同的操作
|
9天前
|
SQL 自然语言处理 网络协议
【Linux开发实战指南】基于TCP、进程数据结构与SQL数据库:构建在线云词典系统(含注册、登录、查询、历史记录管理功能及源码分享)
TCP(Transmission Control Protocol)连接是互联网上最常用的一种面向连接、可靠的、基于字节流的传输层通信协议。建立TCP连接需要经过著名的“三次握手”过程: 1. SYN(同步序列编号):客户端发送一个SYN包给服务器,并进入SYN_SEND状态,等待服务器确认。 2. SYN-ACK:服务器收到SYN包后,回应一个SYN-ACK(SYN+ACKnowledgment)包,告诉客户端其接收到了请求,并同意建立连接,此时服务器进入SYN_RECV状态。 3. ACK(确认字符):客户端收到服务器的SYN-ACK包后,发送一个ACK包给服务器,确认收到了服务器的确
|
15天前
|
Web App开发 运维 监控
深入探索Linux命令pwdx:揭秘进程工作目录的秘密
`pwdx`命令在Linux中用于显示指定进程的工作目录,基于`/proc`文件系统获取实时信息。简单易用,如`pwdx 1234`显示PID为1234的进程目录。结合`ps`和`pgrep`等命令可扩展使用,如查看所有进程或特定进程(如Firefox)的目录。使用时注意权限、进程ID的有效性和与其他命令的配合。查阅`man pwdx`获取更多帮助。
|
17天前
|
存储 Shell Linux
Linux进程概念(下)
本文详细的介绍了环境变量和进程空间的概念及其相关的知识。
21 0
Linux进程概念(下)
|
2天前
|
缓存 Linux 编译器
【Linux】多线程——线程概念|进程VS线程|线程控制(下)
【Linux】多线程——线程概念|进程VS线程|线程控制(下)
6 0
|
2天前
|
存储 Linux 调度
【Linux】多线程——线程概念|进程VS线程|线程控制(上)
【Linux】多线程——线程概念|进程VS线程|线程控制(上)
10 0
|
2天前
|
存储 NoSQL Unix
【Linux】进程信号(下)
【Linux】进程信号(下)
14 0