简要概览
pytorch
官方提供的数据并行类为:
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) • 1
当给定model
时,主要实现功能是将input
数据依据batch
的这个维度,将数据划分到指定的设备上。其他的对象(objects
)复制到每个设备上。在前向传播的过程中,module
被复制到每个设备上,每个复制的副本处理一部分输入数据。在反向传播过程中,每个副本module
的梯度被汇聚到原始的module
上计算(一般为第0
块GPU
)。
并且这里要注意的一点是,这里官方推荐是用
DistributedDataParallel
,因为DistributedDataParallel
使用的是多进程方式,而DataParallel
使用的是多线程的方式。如果使用的是DistributedDataParallel
,你需要使用torch.distributed.launch
去launch程序,参考Distributed Communication Package - Torch.Distributed。
batch size
的大小一定要大于GPU
的数量,我在实践过程中batch size
的大小一般设置为GPU
块数的倍数。在数据分配到不同的机器上的时候,传入module
的数据同样都可以传入DataParallel
(并行之后的module
类型)中,但是tensor
默认按照dim=0
分配到不同的机器上,tuple
, list
,dict
类型的数据被浅拷贝到不同的GPU
上,其它类型的数据将会被分配到不同的进程中。
在调用DataParallel
之前,module
必须要具有他自己的参数(能获取到模型的参数),还需要在指定的GPU
上具有buffer
(不然会报内存出错)。
在前向传播的过程中,
module
被复制到每个设备上,因此在前线传播过程中的任何更新都会丢失。举例来说,如果module
有一个counter
属性,在每次前线传播过程中都会加1
,它将会保留在初始值状态,因为更新在副本上,但是副本前线传播完就被销毁了。然而在DataParallel
中,device[0]
上的副本将其参数和内存数据与并行的module
共享,因此在device[0]
上更新数据将会被记录。返回的结果是来自各个
device
上的数据的汇总。默认是dim 0
维度上的汇总。因此在处理RNN
时序数据时就需要注意这一点。My recurrent network doesn’t work with data parallelism
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) • 1
torch.nn.DataParallel()
函数的参数主要有module
,device_ids
,output_device
这三个。
module
为需要并行的module
。device_ids
为一个list
,默认为所有可操作的devices
。output_device
为需要输出汇总的指定GPU
,默认为device_ids[0]
号。
简单的举例为:
>>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2]) >>> output = net(input_var) # input_var can be on any device, including CPU
源码解析
data_parallel.py
的源码地址为:https://github.com/pytorch/pytorch/blob/master/torch/nn/parallel/data_parallel.py
源码注释
import operator import torch import warnings from itertools import chain from ..modules import Module from .scatter_gather import scatter_kwargs, gather from .replicate import replicate from .parallel_apply import parallel_apply from torch._utils import ( _get_all_device_indices, _get_available_device_type, _get_device_index, _get_devices_properties ) def _check_balance(device_ids): imbalance_warn = """ There is an imbalance between your GPUs. You may want to exclude GPU {} which has less than 75% of the memory or cores of GPU {}. You can do so by setting the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES environment variable.""" device_ids = [_get_device_index(x, True) for x in device_ids] dev_props = _get_devices_properties(device_ids) def warn_imbalance(get_prop): values = [get_prop(props) for props in dev_props] min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1)) max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1)) if min_val / max_val < 0.75: warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos])) return True return False if warn_imbalance(lambda props: props.total_memory): return if warn_imbalance(lambda props: props.multi_processor_count): return
DataParallel类初始化:
class DataParallel(Module): # TODO: update notes/cuda.rst when this class handles 8+ GPUs well def __init__(self, module, device_ids=None, output_device=None, dim=0): super(DataParallel, self).__init__() # 通过调用torch.cuda.is_available()判断是返回“cuda”还是None。 device_type = _get_available_device_type() if device_type is None: # 检查是否有GPU # 如果没有GPU的话,module就不能够并行,直接赋值,设备id置空 self.module = module self.device_ids = [] return if device_ids is None: # 如果没有指定GPU,则默认使用所有可用的GPU # 获取所有可用的设备ID,为一个list。 device_ids = _get_all_device_indices() if output_device is None: # 判断输出设备是否指定 output_device = device_ids[0] # 默认为指定设备的第一个 self.dim = dim self.module = module # self.module就是传入的module。 self.device_ids = [_get_device_index(x, True) for x in device_ids] self.output_device = _get_device_index(output_device, True) self.src_device_obj = torch.device(device_type, self.device_ids[0]) _check_balance(self.device_ids) if len(self.device_ids) == 1: self.module.to(self.src_device_obj)
前向传播
def forward(self, *inputs, **kwargs): # 如果没有可用的GPU则使用原来的module来计算 if not self.device_ids: return self.module(*inputs, **kwargs) # 这里应该是判断模型的参数和buffer都要有。 for t in chain(self.module.parameters(), self.module.buffers()): if t.device != self.src_device_obj: raise RuntimeError("module must have its parameters and buffers " "on device {} (device_ids[0]) but found one of " "them on device: {}".format(self.src_device_obj, t.device)) # 用scatter函数将input平均分配到每个GPU上 inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) # for forward function without any inputs, empty list and dict will be created # so the module can be executed on one device which is the first one in device_ids if not inputs and not kwargs: inputs = ((),) kwargs = ({},) if len(self.device_ids) == 1: # 只有一个给定的GPU的话,就直接调用未并行的module,否者进入下一步 return self.module(*inputs[0], **kwargs[0]) replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # replicate函数主要讲模型复制到多个GPU上 outputs = self.parallel_apply(replicas, inputs, kwargs) # 并行地在多个GPU上计算模型。 return self.gather(outputs, self.output_device) # 将数据聚合到一起,传送到output_device上,默认也是dim 0维度聚合。 def replicate(self, module, device_ids): return replicate(module, device_ids, not torch.is_grad_enabled()) def scatter(self, inputs, kwargs, device_ids): return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim) def parallel_apply(self, replicas, inputs, kwargs): return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)]) def gather(self, outputs, output_device): return gather(outputs, output_device, dim=self.dim)
scatter
函数:
def scatter(inputs, target_gpus, dim=0): r""" Slices tensors into approximately equal chunks and distributes them across given GPUs. Duplicates references to objects that are not tensors. """ def scatter_map(obj): if isinstance(obj, torch.Tensor): return Scatter.apply(target_gpus, None, dim, obj) if isinstance(obj, tuple) and len(obj) > 0: return list(zip(*map(scatter_map, obj))) if isinstance(obj, list) and len(obj) > 0: return list(map(list, zip(*map(scatter_map, obj)))) if isinstance(obj, dict) and len(obj) > 0: return list(map(type(obj), zip(*map(scatter_map, obj.items())))) return [obj for targets in target_gpus] # After scatter_map is called, a scatter_map cell will exist. This cell # has a reference to the actual function scatter_map, which has references # to a closure that has a reference to the scatter_map cell (because the # fn is recursive). To avoid this reference cycle, we set the function to # None, clearing the cell try: res = scatter_map(inputs) finally: scatter_map = None return res
在前向传播中,数据需要通过scatter
函数分配到每个GPU
上,代码在scatter_gather.py
文件下,如果输入的类型不是tensor
的话,会依据数据类型处理一下变成tensor
,再递归调用scatter_map
,最后调用Scatter.apply
方法将数据依据给定的GPU
给划分好返回。
replicate
函数:
replicate
函数需要将模型给复制到每个GPU
上。如果你定义的模型是ScriptModule
的话,也就是在编写自己model
的时候不是继承的nn.Module
,而是继承的nn.ScriptModule
,就不能复制,会报错。
这个函数主要就是将模型参数、buffer
等需要共享的信息,复制到每个GPU
上,感兴趣的自己看吧。
data_parallel
def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None): r"""Evaluates module(input) in parallel across the GPUs given in device_ids. This is the functional version of the DataParallel module. Args: module (Module): the module to evaluate in parallel inputs (Tensor): inputs to the module device_ids (list of int or torch.device): GPU ids on which to replicate module output_device (list of int or torch.device): GPU location of the output Use -1 to indicate the CPU. (default: device_ids[0]) Returns: a Tensor containing the result of module(input) located on output_device """ if not isinstance(inputs, tuple): inputs = (inputs,) if inputs is not None else () device_type = _get_available_device_type() if device_ids is None: device_ids = _get_all_device_indices() if output_device is None: output_device = device_ids[0] device_ids = [_get_device_index(x, True) for x in device_ids] output_device = _get_device_index(output_device, True) src_device_obj = torch.device(device_type, device_ids[0]) for t in chain(module.parameters(), module.buffers()): if t.device != src_device_obj: raise RuntimeError("module must have its parameters and buffers " "on device {} (device_ids[0]) but found one of " "them on device: {}".format(src_device_obj, t.device)) inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim) # for module without any inputs, empty list and dict will be created # so the module can be executed on one device which is the first one in device_ids if not inputs and not module_kwargs: inputs = ((),) module_kwargs = ({},) if len(device_ids) == 1: return module(*inputs[0], **module_kwargs[0]) used_device_ids = device_ids[:len(inputs)] replicas = replicate(module, used_device_ids) outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids) return gather(outputs, output_device, dim)
并行的模型也有了,数据也有了,之后就是利用并行的模型和并行的数据来做计算了。
parallel_apply
函数:
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None): # 判断模型数和输入数据数是否相等 assert len(modules) == len(inputs) if kwargs_tup is not None: assert len(modules) == len(kwargs_tup) else: kwargs_tup = ({},) * len(modules) if devices is not None: assert len(modules) == len(devices) else: devices = [None] * len(modules) devices = list(map(lambda x: _get_device_index(x, True), devices)) lock = threading.Lock() results = {} grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled() def _worker(i, module, input, kwargs, device=None): torch.set_grad_enabled(grad_enabled) if device is None: device = get_a_var(input).get_device() try: with torch.cuda.device(device), autocast(enabled=autocast_enabled): # this also avoids accidental slicing of `input` if it is a Tensor if not isinstance(input, (list, tuple)): input = (input,) output = module(*input, **kwargs) with lock: results[i] = output except Exception: with lock: results[i] = ExceptionWrapper( where="in replica {} on device {}".format(i, device)) if len(modules) > 1: threads = [threading.Thread(target=_worker, args=(i, module, input, kwargs, device)) for i, (module, input, kwargs, device) in enumerate(zip(modules, inputs, kwargs_tup, devices))] for thread in threads: thread.start() for thread in threads: thread.join() else: _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0]) outputs = [] for i in range(len(inputs)): output = results[i] if isinstance(output, ExceptionWrapper): output.reraise() outputs.append(output) return outputs
先判断一下数据的长度是否符合要求。之后利用多线程来处理数据。最后将所有的数据gather
在一起,默认是从第0
个维度gather
在一起。
实例
import torch import torch.nn as nn import torch.optim as optim from torch.autograd import Variable from torch.utils.data import Dataset, DataLoader class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size) def __getitem__(self, index): return self.data[index] def __len__(self): return self.len class Model(nn.Module): def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) self.sigmoid = nn.Sigmoid() # self.modules = [self.fc, self.sigmoid] def forward(self, input): return self.sigmoid(self.fc(input)) if __name__ == '__main__': # Parameters and DataLoaders input_size = 5 output_size = 1 batch_size = 30 data_size = 100 rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size), batch_size=batch_size, shuffle=True) model = Model(input_size, output_size) if torch.cuda.device_count() > 1: print("Let's use", torch.cuda.device_count(), "GPUs!") model = nn.DataParallel(model).cuda() optimizer = optim.SGD(params=model.parameters(), lr=1e-3) cls_criterion = nn.BCELoss() for data in rand_loader: targets = torch.empty(data.size(0)).random_(2).view(-1, 1) if torch.cuda.is_available(): input = Variable(data.cuda()) with torch.no_grad(): targets = Variable(targets.cuda()) else: input = Variable(data) with torch.no_grad(): targets = Variable(targets) output = model(input) optimizer.zero_grad() loss = cls_criterion(output, targets) loss.backward() optimizer.step()