PyTorch并行与分布式(三)DataParallel原理、源码解析、举例实战

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: PyTorch并行与分布式(三)DataParallel原理、源码解析、举例实战

简要概览

  pytorch官方提供的数据并行类为:

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
• 1

  当给定model时,主要实现功能是将input数据依据batch的这个维度,将数据划分到指定的设备上。其他的对象(objects)复制到每个设备上。在前向传播的过程中,module被复制到每个设备上,每个复制的副本处理一部分输入数据。在反向传播过程中,每个副本module的梯度被汇聚到原始的module上计算(一般为第0GPU)。

并且这里要注意的一点是,这里官方推荐是用DistributedDataParallel,因为DistributedDataParallel使用的是多进程方式,而DataParallel使用的是多线程的方式。如果使用的是DistributedDataParallel,你需要使用torch.distributed.launch去launch程序,参考Distributed Communication Package - Torch.Distributed

  batch size的大小一定要大于GPU的数量,我在实践过程中batch size的大小一般设置为GPU块数的倍数。在数据分配到不同的机器上的时候,传入module的数据同样都可以传入DataParallel(并行之后的module类型)中,但是tensor默认按照dim=0分配到不同的机器上,tuple, listdict类型的数据被浅拷贝到不同的GPU上,其它类型的数据将会被分配到不同的进程中。

  在调用DataParallel之前,module必须要具有他自己的参数(能获取到模型的参数),还需要在指定的GPU上具有buffer(不然会报内存出错)。

在前向传播的过程中,module被复制到每个设备上,因此在前线传播过程中的任何更新都会丢失。举例来说,如果module有一个counter属性,在每次前线传播过程中都会加1,它将会保留在初始值状态,因为更新在副本上,但是副本前线传播完就被销毁了。然而在DataParallel中,device[0]上的副本将其参数和内存数据与并行的module共享,因此在device[0]上更新数据将会被记录。

返回的结果是来自各个device上的数据的汇总。默认是dim 0维度上的汇总。因此在处理RNN时序数据时就需要注意这一点。My recurrent network doesn’t work with data parallelism

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
• 1

  torch.nn.DataParallel()函数的参数主要有moduledevice_idsoutput_device这三个。

  1. module为需要并行的module
  2. device_ids为一个list,默认为所有可操作的devices
  3. output_device为需要输出汇总的指定GPU,默认为device_ids[0]号。

  简单的举例为:

>>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
>>> output = net(input_var)  # input_var can be on any device, including CPU

源码解析

  data_parallel.py的源码地址为:https://github.com/pytorch/pytorch/blob/master/torch/nn/parallel/data_parallel.py

  源码注释

import operator
import torch
import warnings
from itertools import chain
from ..modules import Module
from .scatter_gather import scatter_kwargs, gather
from .replicate import replicate
from .parallel_apply import parallel_apply
from torch._utils import (
    _get_all_device_indices,
    _get_available_device_type,
    _get_device_index,
    _get_devices_properties
)
def _check_balance(device_ids):
    imbalance_warn = """
    There is an imbalance between your GPUs. You may want to exclude GPU {} which
    has less than 75% of the memory or cores of GPU {}. You can do so by setting
    the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES
    environment variable."""
    device_ids = [_get_device_index(x, True) for x in device_ids]
    dev_props = _get_devices_properties(device_ids)
    def warn_imbalance(get_prop):
        values = [get_prop(props) for props in dev_props]
        min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1))
        max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1))
        if min_val / max_val < 0.75:
            warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos]))
            return True
        return False
    if warn_imbalance(lambda props: props.total_memory):
        return
    if warn_imbalance(lambda props: props.multi_processor_count):
        return

DataParallel类初始化:

class DataParallel(Module):
    # TODO: update notes/cuda.rst when this class handles 8+ GPUs well
    def __init__(self, module, device_ids=None, output_device=None, dim=0):
        super(DataParallel, self).__init__()
    # 通过调用torch.cuda.is_available()判断是返回“cuda”还是None。
        device_type = _get_available_device_type() 
        if device_type is None: # 检查是否有GPU
          # 如果没有GPU的话,module就不能够并行,直接赋值,设备id置空
            self.module = module
            self.device_ids = []
            return
        if device_ids is None: # 如果没有指定GPU,则默认使用所有可用的GPU
          # 获取所有可用的设备ID,为一个list。
            device_ids = _get_all_device_indices()
        if output_device is None: # 判断输出设备是否指定
            output_device = device_ids[0] # 默认为指定设备的第一个
        self.dim = dim
        self.module = module # self.module就是传入的module。
        self.device_ids = [_get_device_index(x, True) for x in device_ids]
        self.output_device = _get_device_index(output_device, True)
        self.src_device_obj = torch.device(device_type, self.device_ids[0])
        _check_balance(self.device_ids)
        if len(self.device_ids) == 1:
            self.module.to(self.src_device_obj)

前向传播

def forward(self, *inputs, **kwargs):
      # 如果没有可用的GPU则使用原来的module来计算
        if not self.device_ids:
            return self.module(*inputs, **kwargs)
    # 这里应该是判断模型的参数和buffer都要有。
        for t in chain(self.module.parameters(), self.module.buffers()):
            if t.device != self.src_device_obj:
                raise RuntimeError("module must have its parameters and buffers "
                                   "on device {} (device_ids[0]) but found one of "
                                   "them on device: {}".format(self.src_device_obj, t.device))
        # 用scatter函数将input平均分配到每个GPU上
        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) 
        # for forward function without any inputs, empty list and dict will be created
        # so the module can be executed on one device which is the first one in device_ids
        if not inputs and not kwargs:
            inputs = ((),)
            kwargs = ({},)
        if len(self.device_ids) == 1: # 只有一个给定的GPU的话,就直接调用未并行的module,否者进入下一步
            return self.module(*inputs[0], **kwargs[0])
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # replicate函数主要讲模型复制到多个GPU上
        outputs = self.parallel_apply(replicas, inputs, kwargs) # 并行地在多个GPU上计算模型。
        return self.gather(outputs, self.output_device) # 将数据聚合到一起,传送到output_device上,默认也是dim 0维度聚合。
    def replicate(self, module, device_ids):
        return replicate(module, device_ids, not torch.is_grad_enabled())
    def scatter(self, inputs, kwargs, device_ids):
        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
    def parallel_apply(self, replicas, inputs, kwargs):
        return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
    def gather(self, outputs, output_device):
        return gather(outputs, output_device, dim=self.dim)
  • scatter函数:
def scatter(inputs, target_gpus, dim=0):
    r"""
    Slices tensors into approximately equal chunks and
    distributes them across given GPUs. Duplicates
    references to objects that are not tensors.
    """
    def scatter_map(obj):
        if isinstance(obj, torch.Tensor):
            return Scatter.apply(target_gpus, None, dim, obj)
        if isinstance(obj, tuple) and len(obj) > 0:
            return list(zip(*map(scatter_map, obj)))
        if isinstance(obj, list) and len(obj) > 0:
            return list(map(list, zip(*map(scatter_map, obj))))
        if isinstance(obj, dict) and len(obj) > 0:
            return list(map(type(obj), zip(*map(scatter_map, obj.items()))))
        return [obj for targets in target_gpus]
    # After scatter_map is called, a scatter_map cell will exist. This cell
    # has a reference to the actual function scatter_map, which has references
    # to a closure that has a reference to the scatter_map cell (because the
    # fn is recursive). To avoid this reference cycle, we set the function to
    # None, clearing the cell
    try:
        res = scatter_map(inputs)
    finally:
        scatter_map = None
    return res

  在前向传播中,数据需要通过scatter函数分配到每个GPU上,代码在scatter_gather.py文件下,如果输入的类型不是tensor的话,会依据数据类型处理一下变成tensor,再递归调用scatter_map,最后调用Scatter.apply方法将数据依据给定的GPU给划分好返回。

  • replicate函数:

  replicate函数需要将模型给复制到每个GPU上。如果你定义的模型是ScriptModule的话,也就是在编写自己model的时候不是继承的nn.Module,而是继承的nn.ScriptModule,就不能复制,会报错。

  这个函数主要就是将模型参数、buffer等需要共享的信息,复制到每个GPU上,感兴趣的自己看吧。

data_parallel

def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
    r"""Evaluates module(input) in parallel across the GPUs given in device_ids.
    This is the functional version of the DataParallel module.
    Args:
        module (Module): the module to evaluate in parallel
        inputs (Tensor): inputs to the module
        device_ids (list of int or torch.device): GPU ids on which to replicate module
        output_device (list of int or torch.device): GPU location of the output  Use -1 to indicate the CPU.
            (default: device_ids[0])
    Returns:
        a Tensor containing the result of module(input) located on
        output_device
    """
    if not isinstance(inputs, tuple):
        inputs = (inputs,) if inputs is not None else ()
    device_type = _get_available_device_type()
    if device_ids is None:
        device_ids = _get_all_device_indices()
    if output_device is None:
        output_device = device_ids[0]
    device_ids = [_get_device_index(x, True) for x in device_ids]
    output_device = _get_device_index(output_device, True)
    src_device_obj = torch.device(device_type, device_ids[0])
    for t in chain(module.parameters(), module.buffers()):
        if t.device != src_device_obj:
            raise RuntimeError("module must have its parameters and buffers "
                               "on device {} (device_ids[0]) but found one of "
                               "them on device: {}".format(src_device_obj, t.device))
    inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
    # for module without any inputs, empty list and dict will be created
    # so the module can be executed on one device which is the first one in device_ids
    if not inputs and not module_kwargs:
        inputs = ((),)
        module_kwargs = ({},)
    if len(device_ids) == 1:
        return module(*inputs[0], **module_kwargs[0])
    used_device_ids = device_ids[:len(inputs)]
    replicas = replicate(module, used_device_ids)
    outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)
    return gather(outputs, output_device, dim)

  并行的模型也有了,数据也有了,之后就是利用并行的模型和并行的数据来做计算了。

  • parallel_apply函数:
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
  # 判断模型数和输入数据数是否相等
    assert len(modules) == len(inputs)
    if kwargs_tup is not None:
        assert len(modules) == len(kwargs_tup)
    else:
        kwargs_tup = ({},) * len(modules)
    if devices is not None:
        assert len(modules) == len(devices)
    else:
        devices = [None] * len(modules)
    devices = list(map(lambda x: _get_device_index(x, True), devices))
    lock = threading.Lock()
    results = {}
    grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
    def _worker(i, module, input, kwargs, device=None):
        torch.set_grad_enabled(grad_enabled)
        if device is None:
            device = get_a_var(input).get_device()
        try:
            with torch.cuda.device(device), autocast(enabled=autocast_enabled):
                # this also avoids accidental slicing of `input` if it is a Tensor
                if not isinstance(input, (list, tuple)):
                    input = (input,)
                output = module(*input, **kwargs)
            with lock:
                results[i] = output
        except Exception:
            with lock:
                results[i] = ExceptionWrapper(
                    where="in replica {} on device {}".format(i, device))
    if len(modules) > 1:
        threads = [threading.Thread(target=_worker,
                                    args=(i, module, input, kwargs, device))
                   for i, (module, input, kwargs, device) in
                   enumerate(zip(modules, inputs, kwargs_tup, devices))]
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
    else:
        _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
    outputs = []
    for i in range(len(inputs)):
        output = results[i]
        if isinstance(output, ExceptionWrapper):
            output.reraise()
        outputs.append(output)
    return outputs

  先判断一下数据的长度是否符合要求。之后利用多线程来处理数据。最后将所有的数据gather在一起,默认是从第0个维度gather在一起。

实例

import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)
    def __getitem__(self, index):
        return self.data[index]
    def __len__(self):
        return self.len
class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)
        self.sigmoid = nn.Sigmoid()
        # self.modules = [self.fc, self.sigmoid]
    def forward(self, input):
        return self.sigmoid(self.fc(input))
if __name__ == '__main__':
    # Parameters and DataLoaders
    input_size = 5
    output_size = 1
    batch_size = 30
    data_size = 100
    rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
                             batch_size=batch_size, shuffle=True)
    model = Model(input_size, output_size)
    if torch.cuda.device_count() > 1:
        print("Let's use", torch.cuda.device_count(), "GPUs!")
        model = nn.DataParallel(model).cuda()
    optimizer = optim.SGD(params=model.parameters(), lr=1e-3)
    cls_criterion = nn.BCELoss()
    for data in rand_loader:
        targets = torch.empty(data.size(0)).random_(2).view(-1, 1)
        if torch.cuda.is_available():
            input = Variable(data.cuda())
            with torch.no_grad():
                targets = Variable(targets.cuda())
        else:
            input = Variable(data)
            with torch.no_grad():
                targets = Variable(targets)
        output = model(input)
        optimizer.zero_grad()
        loss = cls_criterion(output, targets)
        loss.backward()
        optimizer.step()


相关文章
|
2天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
4月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
131 2
基于Redis的高可用分布式锁——RedLock
|
22天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
65 5
|
26天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
55 8
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
59 16
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
43 5