分布式数据并行#
创建时间:2020 年 1 月 15 日 | 最后更新时间:2024 年 1 月 25 日
警告
torch.nn.parallel.DistributedDataParallel 的实现会随着时间而演变。本文档笔记基于 v1.4 版本时的状态编写。
torch.nn.parallel.DistributedDataParallel (DDP) 可透明地执行分布式数据并行训练。本页将介绍其工作原理并揭示实现细节。
示例#
让我们从一个简单的 torch.nn.parallel.DistributedDataParallel 示例开始。该示例使用 torch.nn.Linear 作为本地模型,用 DDP 包装它,然后对 DDP 模型执行一次前向传播、一次反向传播和一次优化器步骤。之后,本地模型上的参数将被更新,并且不同进程上的所有模型都应该完全相同。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP
def example(rank, world_size):
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
def main():
world_size = 2
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__=="__main__":
# Environment variables which need to be
# set when using c10d's default "env"
# initialization mode.
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
main()
DDP 与 TorchDynamo 兼容。与 TorchDynamo 一起使用时,在编译模型之前应用 DDP 模型包装器,这样 torchdynamo 就可以根据 DDP 桶大小应用 DDPOptimizer (图中断优化)。(有关更多信息,请参阅 TorchDynamo DDPOptimizer)。
ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)
内部设计#
本节通过深入探讨一次迭代中每个步骤的细节,来揭示 torch.nn.parallel.DistributedDataParallel 内部的工作原理。
先决条件:DDP 依赖 c10d
ProcessGroup进行通信。因此,应用程序必须在构建 DDP 之前创建ProcessGroup实例。构造:DDP 构造函数接收对本地模块的引用,并将 rank 为 0 的进程的
state_dict()广播到组内的所有其他进程,以确保所有模型副本都从完全相同的状态开始。然后,每个 DDP 进程创建一个本地Reducer,该Reducer稍后将负责反向传播期间的梯度同步。为了提高通信效率,Reducer将参数梯度组织成桶,并一次处理一个桶。桶大小可以通过在 DDP 构造函数中设置 bucket_cap_mb 参数来配置。参数梯度到桶的映射是在构造时确定的,基于桶大小限制和参数大小。模型参数被分配到桶中,顺序大致与给定模型的Model.parameters()的反向顺序一致。使用反向顺序的原因是 DDP 期望梯度在反向传播期间大致按该顺序准备好。下图显示了一个示例。请注意,grad0和grad1位于bucket1中,而另外两个梯度位于bucket0中。当然,这个假设可能并不总是成立,当这种情况发生时,可能会损害 DDP 的反向传播速度,因为Reducer无法在最早可能的时间启动通信。除了分桶外,Reducer在构造期间还会注册 autograd 钩子,每个参数一个钩子。这些钩子将在反向传播期间梯度准备就绪时触发。前向传播:DDP 接收输入并将其传递给本地模型,然后分析本地模型的输出(如果
find_unused_parameters设置为True)。此模式允许在模型子图上进行反向传播,DDP 通过从模型输出遍历 autograd 图并标记所有未使用的参数为可约化状态,来找出哪些参数参与了反向传播。在反向传播期间,Reducer只会等待未就绪的参数,但它仍然会约化所有桶。标记参数梯度为就绪状态并不能帮助 DDP 跳过桶,但它会阻止 DDP 在反向传播期间永远等待缺失的梯度。请注意,遍历 autograd 图会引入额外的开销,因此应用程序应仅在必要时将find_unused_parameters设置为True。反向传播:
backward()函数直接在损失Tensor上调用,这超出了 DDP 的控制范围,DDP 使用在构造时注册的 autograd 钩子来触发梯度同步。当一个梯度准备就绪时,其对应的 DDP 钩子将触发,DDP 随后将该参数梯度标记为可约化状态。当一个桶中的所有梯度都准备就绪时,Reducer将启动一个异步allreduce操作来计算所有进程中梯度的平均值。当所有桶都准备就绪时,Reducer将阻塞等待所有allreduce操作完成。完成后,平均梯度将被写入所有参数的param.grad字段。因此,在反向传播之后,不同 DDP 进程上相同对应参数的 grad 字段应该是一致的。优化器步骤:从优化器的角度来看,它正在优化一个本地模型。所有 DDP 进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每次迭代中具有相同的平均梯度。
注意
DDP 要求所有进程上的 Reducer 实例以完全相同的顺序调用 allreduce,这是通过始终按桶索引顺序而不是实际的桶就绪顺序运行 allreduce 来实现的。跨进程的 allreduce 顺序不匹配可能导致错误的结果或 DDP 反向传播挂起。
实现#
以下是指向 DDP 实现组件的链接。堆叠图显示了代码的结构。
ProcessGroup#
ProcessGroup.hpp:包含所有进程组实现的抽象 API。
c10d库开箱即用地提供了 3 种实现,即 ProcessGroupGloo、ProcessGroupNCCL 和 ProcessGroupMPI。DistributedDataParallel在初始化期间使用ProcessGroup::broadcast()将模型状态从 rank 为 0 的进程发送到其他进程,并在计算梯度时使用ProcessGroup::allreduce()来对梯度求和。Store.hpp:协助进程组实例相互查找的集结点服务。
DistributedDataParallel#
distributed.py:是 DDP 的 Python 入口点。它实现了
nn.parallel.DistributedDataParallel模块的初始化步骤和forward函数,这些函数调用 C++ 库。其_sync_param函数在 DDP 进程处理多个设备时执行进程内参数同步,并负责将模型缓冲区从 rank 为 0 的进程广播到所有其他进程。进程间参数同步发生在Reducer.cpp中。comm.h:实现了合并广播辅助函数,该函数在初始化期间用于广播模型状态,并在前向传播之前同步模型缓冲区。
reducer.h:提供了反向传播中梯度同步的核心实现。它有三个入口函数
Reducer:构造函数在distributed.py中被调用,该构造函数将Reducer::autograd_hook()注册到梯度累加器。autograd_hook()函数将在梯度准备就绪时由 autograd 引擎调用。prepare_for_backward()在distributed.py中 DDP 前向传播结束时被调用。当 DDP 构造函数中设置find_unused_parameters为True时,它会遍历 autograd 图以查找未使用的参数。
TorchDynamo DDPOptimizer#
DDP 的性能优势来自于在反向传播期间将 allreduce 集合操作与计算重叠。当与 TorchDynamo 一起用于编译整个前向和后向图时,AotAutograd 会阻止这种重叠,因为 allreduce 操作是由 autograd 钩子在整个优化后的后向计算完成后才启动的。
TorchDynamo 的 DDPOptimizer 通过在反向传播期间的 DDP allreduce 桶的逻辑边界处断开前向图来提供帮助。注意:目标是在反向传播期间断开图,最简单的实现是断开前向图,然后对每个部分调用 AotAutograd 和编译。这使得 DDP 的 allreduce 钩子能够在反向传播的各部分之间触发,并调度通信以与计算重叠。
有关更深入的解释和实验结果,请参阅 这篇博文,或者在 torch/_dynamo/optimizations/distributed.py 阅读文档和代码。
要调试 DDPOptimizer,请设置 TORCH_LOGS=’ddp_graphs’ 以获取完整的图转储。对于不包含图的日志,请将 ‘dynamo’、‘distributed’ 或 ‘dist_ddp’ 添加到 TORCH_LOGS 中(用于获取关于桶边界的基本信息)。要禁用 DDPOptimizer,请设置 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 在没有 DDPOptimizer 的情况下仍然可以正常工作,但性能会下降。