[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
0x00 摘要
从本文开始,我们介绍 PyTorch 的数据并行,本文是第一篇,介绍 DataPrallel,因为字数太多(1万两千多字,因此拆分成两篇文章发布)。
本系列其他文章如下:
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
注 : 本文深度借鉴了以下两篇文章,特此深表感谢。
Distributed data parallel training using Pytorch on AWS
PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
0x01 综述
我们首先从各个角度来看看DataParallel。
1.1 从流程上看
DataParallel 从流程上来看,是通过将整个小批次(minibatch)数据加载到主线程上,然后将子小批次(ub-minibatches)数据分散到整个GPU网络中来工作。
- 把 minibatch 数据从page-locked memory 传输到 GPU 0(master),Master GPU 也持有模型,其他GPU拥有模型的 stale copy。
- 在 GPUs 之间 scatter minibatch 数据。具体是将输入一个 minibatch 的数据均分成多份,分别送到对应的 GPU 进行计算。
- 在 GPUs 之间复制模型。与 Module 相关的所有数据也都会复制多份。
- 在每个GPU之上运行前向传播,计算输出。PyTorch 使用多线程来并行前向传播,每个 GPU 在单独的线程上将针对各自的输入数据独立并行地进行 forward 计算。
- 在 master GPU 之上收集(gather)输出,计算损失。即通过将网络输出与批次中每个元素的真实数据标签进行比较来计算损失函数值。
- 把损失在 GPUs 之间 scatter,在各个GPU之上运行后向传播,计算参数梯度。
- 在 GPU 0 之上归并梯度。
- 更新梯度参数。
- 进行梯度下降,并更新主GPU上的模型参数。
- 由于模型参数仅在主GPU上更新,而其他从属GPU此时并不是同步更新的,所以需要将更新后的模型参数复制到剩余的从属 GPU 中,以此来实现并行。
1.2 从模式角度看
首先我们先给出一个技术上的概括,从模式角度看:
- DP 可以被认为是类似参数服务器的应用。
- DDP 可以被认为是集合通讯的应用。
参数服务器大致可以分为 master 和 worker,而DP 基于单机多卡,所以对应关系如下:
- worker :所有GPU(包括GPU 0)都是worker,都负责计算和训练网络。
- master :GPU 0(并非 GPU 真实标号,而是输入参数 device_ids 的首位)也负责整合梯度,更新参数。
所以我们重点看看 GPU 0。
DataParallel会将网络模型默认放在GPU 0上,然后把模型从GPU 0 拷贝到其他的GPU,各个GPU开始并行训练,接着 GPU 0 作为master来进行梯度的汇总和模型的更新,最后将计算任务下发给其他GPU。这非常类似参数服务器的机制。
从官方图也可以看到同样的信息。
1.3 从操作系统角度看
从操作系统角度看,DP 和 DDP 有如下不同(我们属于提前剧透):
- DataParallel 是单进程,多线程的并行训练方式,并且只能在单台机器上运行。
- DistributedDataParallel 是多进程,并且适用于单机和多机训练。DistributedDataParallel 还预先复制模型,而不是在每次迭代时复制模型,并避免了全局解释器锁定。
1.4 低效率
DP 有如下缺陷:
- 冗余数据副本
- 数据先从主机复制到主GPU,然后将微批次( sub-minibatches)在其他GPU之间发布(scatter)。
- 在前向传播之前需要跨GPU进行模型复制。
- 由于模型参数是在主GPU上更新的,因此模型必须在每次正向传播开始时重新同步。
- 每个batch都会有线程创建/销毁开销。
- 并行前向传播是在多个线程中实现的(这可能只是PyTorch的一个issue)。
- 有一个把梯度规约流水线化的机会但是没有利用。
- 在Pytorch 1.0.1数据并行实现中,梯度下降发生在反向传播的末尾,这可以进行流水线化。
- 在主GPU上不必要地收集模型输出output。
- GPU利用率不均,负载不均衡。主GPU的内存和使用率会比其他显卡的高,因为:
- 在主GPU上执行损失loss计算。
- 梯度规约和更新参数均发生在主GPU之上。
0x02 综述
2.1 示例
我们使用一个例子来看看,具体逻辑是:
-
给本程序设置可见GPU。
- 对应代码就是使用 args.gpu_id="2,7" 和 os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id 来配置 gpu 序号,其实目的就是设置 os.environ['CUDA_VISIBLE_DEVICES'] = "2,7",这样 device_ids[0]对应的就是物理上第2号卡,device_ids[1]对应的就是物理上第7号卡。
- 也可以在运行时临时指定,比如:CUDA_VISIBLE_DEVICES='2,7' Python train.py。
-
把模型参数和缓冲区放在device_ids[0]上,在运行DataParallel模块前,并行化模块必须在device_ids [0]上具有其参数和缓冲区。
- 代码就是 model=model.cuda() 。
-
构建DP模型。DP 的好处是使用起来非常方便,只需要将原来单卡的 module 用 DP 改成多卡。
- 代码就是 model=torch.nn.DaraParallel(model)。
- 实际上 DP 是一个Pytorch的nn.Module,所以模型和优化器都需要使用.module来得到实际的模型和优化器。
-
把数据载入到主GPU。
- data,label= data.cuda(),label.cuda()
-
进行前向传播。
- DP 会把模型module 在每个device上复制一份。
- DP 会把输入数据再切分为多个小块,把这些小块数据分发到不同的GPU之中进行计算,每个模型只需要处理自己分配到的数据。
-
进行后向传播。
- DP 会把每个GPU 计算出来的梯度累加到GPU 0之中进行汇总。
具体代码如下:
args.gpu_id="2,7" ; #指定gpu id
args.cuda = not args.no_cuda and torch.cuda.is_available() #是否使用cpu
# 配置环境 也可以在运行时临时指定,比如:CUDA_VISIBLE_DEVICES='2,7' Python train.py
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id # 赋值必须是字符串
device_ids=range(torch.cuda.device_count()) #torch.cuda.device_count()=2
# device_ids=[0,1] ---- 也可以这么使用。这里的0 就是上述指定 2,是主gpu, 1就是7,模型和数据由主gpu分发
if arg.cuda:
model=model.cuda() #将模型复制到gpu ,默认是cuda('0'),即转到第一个GPU 2
if len(device_id)>1:
model=torch.nn.DataParallel(model);#构建DP,前提是model已经.cuda()了
optimizer = torch.optim.SGD(model.parameters(), args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay)
#前向传播时,数据也要执行cuda(),即把数据复制到主gpu里
for batch_idx, (data, label) in pbar:
if args.cuda:
data,label= data.cuda(),label.cuda(); # 数据放到了默认GPU
data_v = Variable(data)
target_var = Variable(label)
prediction= model(data_v,target_var,args)
#这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
#前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果归并到主gpu里
#prediction的长度等于batch_size
criterion = nn.CrossEntropyLoss()
loss = criterion(prediction,target_var) # 在默认GPU之上计算loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
2.2 相关知识
DP 在每次网络传播开始前,会把master节点上的parameters和buffer广播给其他节点,以此来维持状态的统一。这部分相关知识主要是如何把模型拷贝到GPU之上以及如何调用GPU核函数,具体可以参见前文 [源码解析] PyTorch 如何使用GPU。
0x03 定义
3.1 定义
我们通过 DataParallel 的初始化函数来看看 DataParallel 的结构。
__init__
三个输入参数定义如下:
- module : 模型,
- device_ids :训练的device,
- output_device :保存输出结果的device。默认是在device_ids[0],即第一块卡。
代码如下:
import operator
import torch
import warnings
from itertools import chain
from ..modules import Module
from .scatter_gather import scatter_kwargs, gather
from .replicate import replicate
from .parallel_apply import parallel_apply
from torch._utils import (
_get_all_device_indices,
_get_available_device_type,
_get_device_index,
_get_devices_properties
)
class DataParallel(Module):
# TODO: update notes/cuda.rst when this class handles 8+ GPUs well
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 得到可用的GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 没有输入的情况下,使用所有可见的GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 把GPU列表上第一个作为输出,也会作为master
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = [_get_device_index(x, True) for x in device_ids]
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载均衡
_check_balance(self.device_ids)
# 单卡就直接使用
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
3.2 负载均衡
虽然输入数据是均等划分并且并行分配,但是output loss每次都会在第一块GPU聚合相加计算,所以第一块GPU的内存负载和使用率会大于其他显卡。
_check_balance 函数会检查负载是否平衡, 如果内存或者处理器 max/min > 0.75 会有警告。
def _check_balance(device_ids):
imbalance_warn = """
There is an imbalance between your GPUs. You may want to exclude GPU {} which
has less than 75% of the memory or cores of GPU {}. You can do so by setting
the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES
environment variable."""
device_ids = [_get_device_index(x, True) for x in device_ids]
dev_props = _get_devices_properties(device_ids)
def warn_imbalance(get_prop):
values = [get_prop(props) for props in dev_props]
min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1))
max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1))
if min_val / max_val < 0.75:
warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos]))
return True
return False
if warn_imbalance(lambda props: props.total_memory):
return
if warn_imbalance(lambda props: props.multi_processor_count):
return
0x04 前向传播
DataParallel并行计算只存在在前向传播过程之中。
4.1 总述
之前示例之中已经用 cuda() 函数来把模型放到 GPU[0] 之上,GPU[0] 这里已经有了模型的parameters 和 buffers。
model=model.cuda()
所以forward函数之中,就不用作这一步,而是从分发模型和数据开始,需要注意的是:每次前向传播的时候都会分发模型。具体分为几个步骤。
- 验证:遍历module的parameters和buffers,看看是否都在GPU[0]之上,如果不在,报错。
- 分发((Scatter)输入数据:将输入数据根据其第一个维度(一般是 batch 大小)划分多份,传送到多个 GPU;
- 复制(Replicate)模型:将模型分别拷贝到多个 GPU;
- 并行应用(parallel_apply):在多个模型之上并行进行前向传播。因为 GPU device_ids[0] 和 base parallelized module 共享存储,所以在device[0] 上的 in-place 更新也会被保留下来,其他的GPU则不会。
- 收集(Gather):收集从多个 GPU 上传送回来的数据;
具体代码如下:
def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DataParallel.forward"):
# 如果机器上没有GPU,则直接用CPU运行
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 遍历module的parameters和buffers,看看是否都在GPU[0]之上,如果不在,报错。
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# 现在GPU[0]上有了模型,开始训练
# 首先分发输入
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs = ((),)
kwargs = ({},)
# 如果只有单卡,直接使用
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)
# 把前向传播的结果收集到master
return self.gather(outputs, self.output_device)
4.2 分发(输入)
上面代码之中,如下语句完成了数据分发操作。
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
对应我们传播图是:
所以我们先看看如何分发。
scatter 实际就是 scatter_kwargs 的封装,所以我们直接看 scatter_kwargs。
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
4.2.1 scatter_kwargs
scatter_kwargs 调用了 scatter 分别对input和 kwargs 进行分发。
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
r"""Scatter with support for kwargs dictionary"""
# 分发input
inputs = scatter(inputs, target_gpus, dim) if inputs else []
# 分发kwargs
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
# 用空项补齐,这样可以让 inputs 和 kwargs 长度相等
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
# 返回 tuple
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
4.2.2 scatter
从注释中可以知道,tensor 会切分成大致相等的块,然后在给定的GPU之间分配。就是将一个 batch 数据近似等分成更小的 batch。对于其他类型的变量,会根据不同类型进行不同操作,比如调用 scatter_map 对其内部进行递归处理。
def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
# 针对张量会调用Scatter.apply处理
return Scatter.apply(target_gpus, None, dim, obj)
if is_namedtuple(obj):
# 调用 scatter_map 对其子模块进行递归处理。
return [type(obj)(*args) for args in zip(*map(scatter_map, obj))]
if isinstance(obj, tuple) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return [list(i) for i in zip(*map(scatter_map, obj))]
if isinstance(obj, dict) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))]
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map = None
return res
4.2.3 Scatter
前面提到了 Scatter.apply 处理张量,我们就接着看看。Scatter 拓展了 Function,逻辑如下:
- 如果 cuda 可用,则得到 streams 列表,这样可以在后台流进行 CPU 到 GPU 的拷贝。
- 调用 comm.scatter 进行分发。
- 调用 wait_stream 和 record_stream 对拷贝流进行同步。
class Scatter(Function):
@staticmethod
def forward(ctx, target_gpus, chunk_sizes, dim, input):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.dim = dim
ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
streams = None
# 对于cuda,进行处理
if torch.cuda.is_available() and ctx.input_device == -1:
# Perform CPU to GPU copies in a background stream
streams = [_get_stream(device) for device in target_gpus]
# 调用C++进行操作
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
# Synchronize with the copy stream
if streams is not None:
for i, output in enumerate(outputs):
with torch.cuda.device(target_gpus[i]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[i]) # 同步
output.record_stream(main_stream) # 同步
return outputs
@staticmethod
def backward(ctx, *grad_output):
return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
4.2.4 comm.scatter
该函数主要是调用 torch._C._scatter
,这样就进入了C++世界。
def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
"""Scatters tensor across multiple GPUs. """
tensor = _handle_complex(tensor)
if out is None:
devices = [_get_device_index(d) for d in devices]
return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
else:
return tuple(torch._C._scatter_out(tensor, out, dim, streams))
4.2.5 C++
在转换文件之中,可以看到 scatter 是我们想分析的目标。
.def(
"_scatter",
[](at::Tensor& tensor,
std::vector<int64_t>& devices,
c10::optional<std::vector<int64_t>> chunk_sizes,
int64_t dim,
c10::optional<py::object> py_streams) {
c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>> streams;
if (py_streams) {
py::handle handle = *py_streams;
streams = THPUtils_PySequence_to_CUDAStreamList(handle.ptr());
}
// Note: We're holding the GIL up to here.
pybind11::gil_scoped_release no_gil;
// 实际需要看这里
return scatter(tensor, devices, chunk_sizes, dim, streams);
},
py::arg("tensor"),
py::arg("devices"),
py::arg("chunk_sizes"),
py::arg("dim"),
py::arg("streams"))
在 scatter 之中可以看到,scatter就是把数据分布到各个GPU之上,逻辑如下:
- 首先调用 split_with_sizes 或者chunk 把tensor分割成 chunks。
- 其次把 chunks 分布到各个GPU之上,具体是通过 to 分发完成的。
std::vector<at::Tensor> scatter(
const at::Tensor& tensor,
at::IntArrayRef devices,
const c10::optional<std::vector<int64_t>>& chunk_sizes,
int64_t dim,
const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
streams) {
dim = at::maybe_wrap_dim(dim, tensor);
// 首先把tensor分割成 chunks
std::vector<at::Tensor> chunks = chunk_sizes
? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
: tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
at::cuda::OptionalCUDAStreamGuard cuda_guard;
// 其次把 chunks 分布到各个GPU之上
for (size_t i = 0; i < chunks.size(); ++i) {
const auto device_index = static_cast<int16_t>(devices[i]);
if (device_index != tensor.get_device()) {
if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
cuda_guard.reset_stream(*(*streams)[i]);
}
chunks[i] = chunks[i].to( // 拷贝
{DeviceType::CUDA, device_index},
/*non_blocking=*/true,
/*copy=*/false,
/*memory_format=*/at::MemoryFormat::Preserve);
}
}
return chunks; // 返回结果
}
4.3 复制(模型)
目前,我们已经使用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,下面会用 Replicate 函数将模型从 device[0] 复制到不同的卡。
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
对应我们传播图是:
replicate 只是转发,我们还需要接着看。
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
4.3.1 replicate
replicate 具体逻辑是:
使用 _replicatable_module 看看是否可以安全的复制模型。
看看有多少个GPU,需要复制多少份。
-
复制操作。
- 复制 parameters。
- 使用 _broadcast_coalesced_reshape 来把parameters拷贝到各个GPU。
- 复制buffers。
- 首先统计一下buffers。
- 记录需要求导的 buffer 的 index。
- 记录不需要求导的 buffer 的 index。
- 对于两种buffers分别使用_broadcast_coalesced_reshape拷贝到各个GPU。
- 复制模型。
- modules()返回一个包含当前模型所有模块的迭代器。转变成list,可以认为把模型打平了。
- 遍历modules,往每个module_copies里面添加模型的每一层。
- 最终,module_copies[j] 里面包含了模型的每一层,即
module_copies[j][i]
就是模型的第 i 层。
- 复制 parameters。
-
配置操作。
- 就是配置模型网络,把GPU中数据的 reference 配置到 modules 数组的每一个module 之中,这样这些 module 就是完备模型了。
- 因为之前是把嵌套的模型网络打散了分别拷贝到GPU:buffers和parameters也分别拷贝到了GPU。现在需要把它们重新配置到浅拷贝的模型之中,这样就把模型逻辑补齐了。
- 遍历模型每个子模块,只配置需要的部分参数。
- 处理 其子
_modules_
。 - 处理 其_parameters。
- 处理 其 _buffers。
- 处理 其子
后续并行操作时候,每一个 worker 会得到 modules 数组的每一个module,就在这个 module 之上进行训练。
具体代码如下:
def replicate(network, devices, detach=False):
if not _replicatable_module(network):
raise RuntimeError("Cannot replicate network where python modules are "
"childrens of ScriptModule")
if not devices:
return []
# 看看有多少个GPU,需要复制多少份
devices = [_get_device_index(x, True) for x in devices]
num_replicas = len(devices) # 复制这些份
# 1)复制操作
# 复制参数 parameters
params = list(network.parameters())
param_indices = {param: idx for idx, param in enumerate(params)}
# 拷贝到各个GPU,我们随后会讲解_broadcast_coalesced_reshape
param_copies = _broadcast_coalesced_reshape(params, devices, detach)
# 复制buffers
# 首先统计一下buffers
buffers = list(network.buffers())
buffers_rg = [] # 需要求导的
buffers_not_rg = [] # 不需要求导的
for buf in buffers:
if buf.requires_grad and not detach:
buffers_rg.append(buf)
else:
buffers_not_rg.append(buf)
# 记录需要求导的 buffer 的 index
buffer_indices_rg = {buf: idx for idx, buf in enumerate(buffers_rg)}
# 记录不需要求导的 buffer 的 index
buffer_indices_not_rg = {buf: idx for idx, buf in enumerate(buffers_not_rg)}
# 对于两种buffers分别拷贝到各个GPU
buffer_copies_rg = _broadcast_coalesced_reshape(buffers_rg, devices, detach=detach)
buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, detach=True)
# 准备拷贝模型网络
modules = list(network.modules()) # modules()返回一个包含当前模型所有模块的迭代器。转变成list,可以认为把模型打平了
module_copies = [[] for device in devices] # 为各个GPU准备好空list
module_indices = {}
# 得到模型的浅拷贝列表
for i, module in enumerate(modules): # 遍历模型 list
module_indices[module] = i
for j in range(num_replicas):
replica = module._replicate_for_data_parallel() # 获取浅拷贝
# This is a temporary fix for DDP. DDP needs to access the
# replicated model parameters. It used to do so through
# `mode.parameters()`. The fix added in #33907 for DP stops the
# `parameters()` API from exposing the replicated parameters.
# Hence, we add a `_former_parameters` dict here to support DDP.
replica._former_parameters = OrderedDict()
module_copies[j].append(replica) # 往每个module_copies里面添加模型的每一层
# 最终,module_copies[j] 里面包含了模型的每一层,即module_copies[j][i] 就是模型的第 i 层
# 2)配置操作
# 这一步的目的是:把GPU中数据的reference赋值到浅拷贝之中,变成完备模型。因为之前是把嵌套的模型网络打散了分别拷贝到GPU,buffers和parameters也分别拷贝到了GPU,现在把他们构建到浅拷贝的模型之中,把模型逻辑补齐。
for i, module in enumerate(modules): # 遍历模型每个子模块,只赋值需要的部分参数
# 处理其子_modules
for key, child in module._modules.items():
if child is None:
for j in range(num_replicas):
replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
replica._modules[key] = None
else:
module_idx = module_indices[child]
for j in range(num_replicas):
replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
setattr(replica, key, module_copies[j][module_idx]) # 设置第j个模型的对应部分,下同
# 处理_parameters
for key, param in module._parameters.items():
if param is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._parameters[key] = None
else:
param_idx = param_indices[param]
for j in range(num_replicas):
replica = module_copies[j][i]
param = param_copies[j][param_idx]
# parameters in replicas are no longer leaves,
# so setattr them as non-parameter attributes
setattr(replica, key, param)
# expose the parameter for DDP
replica._former_parameters[key] = param
# 处理 _buffers
for key, buf in module._buffers.items():
if buf is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._buffers[key] = None
else:
if buf.requires_grad and not detach:
buffer_copies = buffer_copies_rg
buffer_idx = buffer_indices_rg[buf]
else:
buffer_copies = buffer_copies_not_rg
buffer_idx = buffer_indices_not_rg[buf]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, buffer_copies[j][buffer_idx])
return [module_copies[j][0] for j in range(num_replicas)]
4.3.2 检查拷贝
_replicatable_module 用来检查模型是否可以安全拷贝。
# Check if we can safely replicate the module.
# there are two types of module:
# 1. python modules
# 2. ScriptModule
#
# currently a module cannot be replicated properly if the descendants of
# any ScriptModule contains python module (type 1 above)
def _replicatable_module(module, memo=None):
# module.modules() contains module itself as the first element
def descendant_modules(module):
gen = module.modules()
next(gen)
return gen
if not _is_jit_enabled():
return True
if memo is None:
memo = set()
# memoize visited modules
memo.add(module)
if _is_script_module(module):
memo.update(descendant_modules(module))
return all(_is_script_module(descendant) for
descendant in descendant_modules(module))
for child in module.children():
# since any unreplicatable module will cause the check to return
# False early, visited modules here can be safely ignored.
if child in memo:
continue
if not _replicatable_module(child, memo):
return False
return True
4.3.3 共享拷贝
在 PyTorch 之中,有浅拷贝和深拷贝之分。
假定模型内部是一系列参数矩阵,model这个对象实际上是指向各个参数矩阵。
- 浅拷贝(shadow copy) 则只是拷贝最外层的数值和指针,不拷贝更深层次的对象,就是只拷贝了父对象。model.state_dict()也是浅拷贝,如果令param=model.state_dict(),那么当你修改param,相应地也会修改model的参数。
- 与之对应,深拷贝(deepcopy):拷贝数值、指针和指针指向的深层次内存空间,即拷贝了父对象及其子对象。
比如:
import torch
import copy
# a引用指向某块内存空间
a = torch.nn.Linear(in_features=5, out_features=1, bias=True)
# 浅拷贝相当于拷贝一个引用,所以他们指向的内存空间是一样的
b = copy.copy(a)
# state_dict is shadow copy
p = a.state_dict()
print(id(a.state_dict()) == id(p)) # False,这两个不相等
# 通过引用p去修改内存空间
print(a.weight)
p['weight'][0][0] = 8.8888
# 可以看到a指向的内存空间也被修改了
print(a.weight)
输出如下:
False
Parameter containing:
tensor([[-0.2253, 0.0802, 0.3984, -0.1208, 0.3796]], requires_grad=True)
Parameter containing:
tensor([[ 8.8888, 0.0802, 0.3984, -0.1208, 0.3796]], requires_grad=True)
具体回到我们的分析,在 module类中,有 _replicate_for_data_parallel 方法,其用来返回一个副本,这些副本和原始模型共享存储,就是浅拷贝。
def _replicate_for_data_parallel(self):
replica = self.__new__(type(self))
replica.__dict__ = self.__dict__.copy()
# replicas do not have parameters themselves, the replicas reference the original
# module.
replica._parameters = OrderedDict()
replica._buffers = replica._buffers.copy() # 浅拷贝
replica._modules = replica._modules.copy() # 浅拷贝模型内部的子模块
replica._is_replica = True
return replica
可以认为,在设置操作之前,拷贝如下:
+---------------------------------------------------------------+
| +----------------------+ |
| CPU | Module | |
| | | |
| | _parameters | |
| | | |
| +--------------> _buffers <-------------+ |
| | | | | |
| | +-------> _modules <----------+ | |
| | | | | | | |
| | | +----------------------+ | | |
| +---------------------+ | +----------------------+ | | |
| | module_copies[0] | | | | module_copies[1] | | | |
| | | | | | | | | |
| | _parameters | | | | _parameters | | | |
| | | | | | | | | |
| | _buffers +----+ | | | _buffers +--------------+ |
| | | | | | | |
| | _modules +-------->+ | _modules +--------->+ |
| | | | | |
| +---------------------+ +----------------------+ |
+---------------------------------------------------------------+
+---------------------+ +----------------------+
| GPU 0 | | GPU 1 |
| | | |
| _parameters | | _parameters |
| | | |
| _buffers | | _buffers |
| | | |
| | | |
| | | |
+---------------------+ +----------------------+
在设置操作之后,则如下:
+-----------------------------------------------------------------+
| CPU +----------------------+ |
| | Module | |
| | | |
| | _parameters | |
| | | |
| | _buffers | |
| | | |
| | _modules | |
| | | |
| +----------------------+ |
| +---------------------+ +----------------------+ |
| | module_copies[0] | | module_copies[1] | |
| | | | | |
+---------+ _parameters | | _parameters +-----------+ |
| | | | | | | |
| | | _buffers +------------+ | _buffers +-----------+ | |
| | | | | | | | | |
| | | _modules | | | _modules | | | |
| | | | | | | | | |
| | +---------------------+ | +----------------------+ | | |
| +-----------------------------------------------------------------+
| | | |
| +---------------------+ | +----------------------+ | |
| | GPU 0 | | | GPU 1 | | |
| | | | | | | |
+---------> _parameters | | | _parameters <----------+
| | | | | |
| _buffers <----------+ | _buffers <--------+
| | | |
| | | |
| | | |
+---------------------+ +----------------------+
4.3.4 拷贝操作
4.3.4.1 _broadcast_coalesced_reshape
拷贝参数都用到了_broadcast_coalesced_reshape。
def _broadcast_coalesced_reshape(tensors, devices, detach=False):
from ._functions import Broadcast
if detach:
# 如果是detach,就直接调用
return comm.broadcast_coalesced(tensors, devices)
else:
# Use the autograd function to broadcast if not detach
if len(tensors) > 0:
# 否则先用Broadcast过度一下,最后还是调用broadcast_coalesced
tensor_copies = Broadcast.apply(devices, *tensors)
return [tensor_copies[i:i + len(tensors)]
for i in range(0, len(tensor_copies), len(tensors))]
else:
return []
4.3.4.2 Broadcast
使用 Broadcast 过度一下的原因是:因为张量不是 detached,所以除了广播之外,还需要在上下文中设置哪些不需要梯度。在某些情况下,用户自定义的Function可能需要知道此情况。
class Broadcast(Function):
@staticmethod
def forward(ctx, target_gpus, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Broadcast function not implemented for CPU tensors'
)
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.target_gpus = target_gpus
if len(inputs) == 0:
return tuple()
ctx.num_inputs = len(inputs)
# input 放在 device[0]
ctx.input_device = inputs[0].get_device()
# 和 detach 的情形一样
outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
non_differentiables = []
# 在上下文中设置哪些不需要梯度
for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
if not input_requires_grad:
for output in outputs:
non_differentiables.append(output[idx])
ctx.mark_non_differentiable(*non_differentiables)
return tuple([t for tensors in outputs for t in tensors])
@staticmethod
def backward(ctx, *grad_outputs):
return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
其中,mark_non_differentiable 定义在 torch/csrc/autograd/custom_function.cpp,这里会在 AutogradContext 配置非微分的变量。
void AutogradContext::mark_non_differentiable(const variable_list &outputs) {
non_differentiable_.clear();
non_differentiable_.reserve(outputs.size());
for(auto& var : outputs) {
non_differentiable_.insert(var.unsafeGetTensorImpl());
}
}
4.3.4.3 broadcast_coalesced
broadcast_coalesced 会跳转到 C++世界。
def broadcast_coalesced(tensors, devices, buffer_size=10485760):
"""Broadcasts a sequence tensors to the specified GPUs.
Small tensors are first coalesced into a buffer to reduce the number
of synchronizations.
Args:
tensors (sequence): tensors to broadcast. Must be on the same device,
either CPU or GPU.
devices (Iterable[torch.device, str or int]): an iterable of GPU
devices, among which to broadcast.
buffer_size (int): maximum size of the buffer used for coalescing
Returns:
A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
"""
devices = [_get_device_index(d) for d in devices]
tensors = [_handle_complex(t) for t in tensors]
return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
4.3.4.4 C++
从初始化代码中可以看到,具体在 broadcast_coalesced 完成。
auto m = py::cast<py::module>(module);
m.def(
"_broadcast_coalesced",
[](std::vector<at::Tensor>& tensors,
std::vector<int64_t> devices,
size_t buffer_size) {
return broadcast_coalesced(tensors, devices, buffer_size);
},
py::arg("tensors"),
py::arg("devices"),
py::arg("buffer_size"),
py::call_guard<py::gil_scoped_release>())
具体代码位于 torch/csrc/cuda/comm.cpp。我们研究一下其注释。
broadcast_coalesced 会把变量分发给所有GPU。在broadcast_coalesced中,多个变量可以合并成一个大变量,然后广播到其他设备,然后会根据原始形状进行拆分(split)。
拆分(split)时,视图操作将使所有变量一起广播以共享一个版本计数器,因为它们都是大变量的视图。但是,该大变量会立即被丢弃,并且所有这些变量根本不共享存储。
例如,当两个缓冲区在“DataParallel”中一起广播,其中一个在“forward”期间执行in-place操作,而另一个在backward中被使用,autograd引擎将发出抱怨。因此,我们在广播后重新包装这些变量,并为它们提供单独的版本计数器。
// broadcast_coalesced
// ~~~~~~~~~~~~~~~~~~~
//
// In broadcast_coalesced, multiple variables may be coalesced into a single
// large one, broadcast to other devices, and the get split according to the
// original shapes.
//
// When splitting, the view operations will make all Variables broadcast
// together to share a single version counter, because they are all views of the
// large Variable. However, that large Variable is immediately discarded and all
// these Variables do not share storage at all.
//
// For example, when two buffers are broadcast together in `DataParallel` and
// one of them is modified in-place during `forward` but the other is needed in
// backward, autograd engine will complain.
//
// We thus re-wrap these Variables after broadcasting (i.e., effectively doing
// what is equivalent to .data in Python), and give them individual version
// counters.
broadcast_coalesced 方法的具体参数解释如下:
- tensors 必须在同一个设备,CPU 或者 GPU;
- devices 即是要拷贝到的设备;
- buffer_size 则是最大的buffer。这里用到 buffer 将小张量合并到缓冲区以减少同步次数;
tensor_list2d broadcast_coalesced(
TensorList tensors,
IntArrayRef devices,
size_t buffer_size) {
TORCH_CHECK(
std::all_of(
tensors.begin(),
tensors.end(),
[&](const at::Tensor& t) { return t.get_device() == devices[0]; }),
"All tensors must be on devices[0]: ",
devices[0]);
#ifdef USE_NCCL
buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size);
#endif
tensor_list2d outputs(devices.size());
outputs[0] = tensors.vec();
for (auto& o : outputs)
o.reserve(tensors.size());
unique_type_checker type_checker;
at::cuda::CUDAGuard device_guard(devices[0]);
for (auto& chunk : utils::take_tensors(tensors, buffer_size)) {
auto type_id = chunk.type_id();
type_checker.show(type_id);
std::vector<at::Tensor> results;
if (chunk.options().is_sparse()) {
auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors);
auto broadcast_indices = broadcast(flat_tuple.first, devices); //这里进行广播
auto broadcast_values = broadcast(flat_tuple.second, devices); //这里进行广播
results.reserve(devices.size());
for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
device_guard.set_index(devices[i]);
auto& device_outputs = outputs[i];
auto& inds = broadcast_indices[i];
auto& vals = broadcast_values[i];
for (auto& t :
utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) {
Variable var = t;
device_outputs.push_back(make_variable(var.tensor_data(), false));
}
}
} else {
auto results = // 这里进行广播
broadcast(utils::flatten_dense_tensors(chunk.tensors), devices);
for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
device_guard.set_index(devices[i]);
auto& device_outputs = outputs[i];
for (auto& t :
utils::unflatten_dense_tensors(results[i], chunk.tensors)) {
Variable var = t;
device_outputs.push_back(make_variable(var.tensor_data(), false));
}
}
}
}
// If we only saw a single tensor type, then we can skip expensive reordering
if (!type_checker.unique) {
for (auto& o : outputs)
utils::reorder_tensors_like(o, tensors);
}
return outputs;
}
broadcast 方法如下:
std::vector<Tensor> broadcast(const Tensor& tensor, IntArrayRef devices) {
std::vector<Tensor> diff_device_dst_tensors;
diff_device_dst_tensors.reserve(devices.size());
for (auto device : devices) {
if (device != tensor.get_device()) {
diff_device_dst_tensors.push_back(at::empty(
tensor.sizes(),
tensor.options().device(
at::Device(DeviceType::CUDA, device)))); // preserve memory format
}
}
// 继续调用操作
_broadcast_out_impl(tensor, diff_device_dst_tensors);
std::vector<Tensor> dst_tensors;
dst_tensors.reserve(devices.size());
auto it = diff_device_dst_tensors.begin();
for (auto device : devices) {
if (device != tensor.get_device()) {
dst_tensors.push_back(*it++);
} else {
dst_tensors.push_back(tensor);
}
}
TORCH_INTERNAL_ASSERT(it == diff_device_dst_tensors.end());
return dst_tensors;
}
最终调用到 _broadcast_out_impl,把源张量 (CPU or CUDA) 广播到一个CUDA设备列表上,其调用了nccl::broadcast(nccl_list)。
static inline std::vector<Tensor>& _broadcast_out_impl(
const Tensor& tensor,
std::vector<Tensor>& out_tensors) {
#ifdef USE_NCCL
std::vector<Tensor> nccl_list;
nccl_list.reserve(out_tensors.size() + 1);
nccl_list.push_back(tensor);
for (auto& out_tensor : out_tensors) {
nccl_list.push_back(out_tensor);
}
if (nccl::is_available(nccl_list)) {
nccl::broadcast(nccl_list); // 这里调用了 NCCL 操作
} else {
#else
{
#endif
for (auto& out_tensor : out_tensors) {
out_tensor.copy_(tensor, /*non_blocking=*/true);
}
}
return out_tensors;
}
至此,我们已经把数据和模型都分布到其他 GPU 之上。我们把目前的前向图先构建出来,大家可以有一个清晰的理解,replicate 调用了Broadcast.forward,同时往其context 存储了input_device和num_inputs。接下来可以进行前行传播。
+----------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| |
| replicate +---------------> parallel_apply gather |
| |
+----------------------------------------------------------------------------------------+
+---------------------------+
| Broadcast |
| |
| |
| |
| forward() +----------->
| |
| |
| +---------------------+ |
| | ctx | |
| | input_device | |
| | | |
| | num_inputs | |
| | | |
| +---------------------+ |
| |
| |
| |
| |
| |
| |
+---------------------------+
因为篇幅所限,下一篇我们从并行操作(前向传播)开始继续分析。
0xFF 参考
PyTorch 源码解读之 torch.optim:优化算法接口详解
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20
[原创][深度][PyTorch] DDP系列第二篇:实现原理与源代码解析
Pytorch踩坑记:赋值、浅拷贝、深拷贝三者的区别以及model.state_dict()和model.load_state_dict()的坑点