分布式数据并行入门#
创建于: 2019年4月23日 | 最后更新: 2025年9月23日 | 最后验证: 2024年11月5日
作者:Shen Li
编辑者: Joe Zhu, Chirag Pandya
注意
在 github 查看和编辑此教程。
先决条件
DistributedDataParallel (DDP) 是 PyTorch 中一个强大的模块,允许您跨多个机器并行化模型,非常适合大规模深度学习应用。要使用 DDP,您需要启动多个进程,并在每个进程中创建一个 DDP 实例。
但它是如何工作的呢?DDP 使用 torch.distributed 包中的集体通信来跨所有进程同步梯度和缓冲区。这意味着每个进程都有自己的模型副本,但它们会协同工作,就像模型在一个机器上一样进行训练。
为了实现这一点,DDP 为模型中的每个参数注册一个 autograd hook。当运行反向传播时,此 hook 会触发跨所有进程的梯度同步。这确保每个进程都拥有相同的梯度,然后使用这些梯度来更新模型。
有关 DDP 工作原理和如何有效使用它的更多信息,请务必查看 DDP 设计说明。使用 DDP,您可以比以往任何时候都更快、更有效地训练您的模型!
推荐使用 DDP 的方式是为每个模型副本启动一个进程。模型副本可以跨越多个设备。DDP 进程可以位于同一台机器上,也可以跨多台机器。请注意,GPU 设备不能跨 DDP 进程共享(即一个 GPU 对应一个 DDP 进程)。
在本教程中,我们将从一个基本的 DDP 用例开始,然后展示更高级的用例,包括模型检查点和 DDP 与模型并行结合使用。
注意
本教程中的代码在 8-GPU 服务器上运行,但可以轻松推广到其他环境。
DataParallel 与 DistributedDataParallel 的比较#
在我们深入研究之前,让我们澄清一下,尽管 DistributedDataParallel 增加了复杂性,为什么您会考虑使用它而不是 DataParallel。
首先,
DataParallel是单进程、多线程的,但它只能在一台机器上运行。相比之下,DistributedDataParallel是多进程的,支持单机和多机训练。由于线程间的 GIL 争用、每个迭代复制的模型以及输入散射和输出收集引入的额外开销,即使在一台机器上,DataParallel通常也比DistributedDataParallel慢。回顾一下 之前的教程,如果您的模型太大而无法放入单个 GPU,您必须使用 **模型并行** 将其拆分到多个 GPU 上。
DistributedDataParallel可以与 **模型并行** 一起使用,而DataParallel目前则不能。当 DDP 与模型并行结合使用时,每个 DDP 进程将使用模型并行,而所有进程将 collectively 使用数据并行。
基本用例#
要创建一个 DDP 模块,您必须首先正确设置进程组。更多详细信息可以在 使用 PyTorch 编写分布式应用程序 中找到。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# We want to be able to train our model on an `accelerator <https://pytorch.ac.cn/docs/stable/torch.html#accelerators>`__
# such as CUDA, MPS, MTIA, or XPU.
acc = torch.accelerator.current_accelerator()
backend = torch.distributed.get_default_backend_for_device(acc)
# initialize the process group
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
现在,让我们创建一个玩具模块,用 DDP 包装它,并向其提供一些虚拟输入数据。请注意,由于 DDP 在 DDP 构造函数中将模型状态从 rank 0 进程广播到所有其他进程,因此您无需担心不同的 DDP 进程从不同的初始模型参数值开始。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running basic DDP example on rank {rank}.")
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
如您所见,DDP 封装了低级的分布式通信细节,并提供了清晰的 API,就像一个本地模型一样。梯度同步通信在反向传播过程中发生,并与反向计算重叠。当 backward() 返回时,param.grad 已经包含了同步的梯度张量。对于基本用例,DDP 只需增加几行代码即可设置进程组。当将 DDP 应用于更高级的用例时,一些注意事项需要谨慎。
处理速度不均#
在 DDP 中,构造函数、前向传播和后向传播是分布式同步点。不同的进程应启动相同数量的同步,并以相同的顺序到达这些同步点,并在大致相同的时间进入每个同步点。否则,快速的进程可能会提前到达并因等待滞后者而超时。因此,用户负责平衡进程之间的工作负载分配。有时,由于网络延迟、资源争用或不可预测的工作负载峰值等原因,处理速度不均是不可避免的。为避免这种情况下的超时,请确保在调用 init_process_group 时传递一个足够大的 timeout 值。
保存和加载检查点#
通常使用 torch.save 和 torch.load 在训练过程中为模型创建检查点并从检查点恢复。有关更多详细信息,请参阅 保存和加载模型。使用 DDP 时,一个优化是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写入开销。这是可行的,因为所有进程都从相同的参数开始,并且在后向传播中同步梯度,因此优化器应该持续将参数设置为相同的值。如果您使用此优化(即在一个进程中保存,在所有进程中恢复),请确保没有进程在保存完成之前开始加载。此外,加载模块时,您需要提供适当的 map_location 参数,以防止进程访问其他进程的设备。如果缺少 map_location,torch.load 将首先将模块加载到 CPU,然后将每个参数复制到其保存的位置,这将导致同一台机器上的所有进程使用同一组设备。有关更高级的故障恢复和弹性支持,请参阅 TorchElastic。
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# We want to be able to train our model on an `accelerator <https://pytorch.ac.cn/docs/stable/torch.html#accelerators>`__
# such as CUDA, MPS, MTIA, or XPU.
acc = torch.accelerator.current_accelerator()
# configure map_location properly
map_location = {f'{acc}:0': f'{acc}:{rank}'}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print(f"Finished running DDP checkpoint example on rank {rank}.")
DDP 与模型并行结合#
DDP 也适用于多 GPU 模型。包装多 GPU 模型的 DDP 在训练具有海量数据的大型模型时尤其有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
将多 GPU 模型传递给 DDP 时,绝对不能设置 device_ids 和 output_device。输入和输出数据将由应用程序或模型的 forward() 方法放置在适当的设备上。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running DDP with model parallel example on rank {rank}.")
if __name__ == "__main__":
n_gpus = torch.accelerator.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP#
我们可以利用 PyTorch Elastic 来简化 DDP 代码并更轻松地初始化作业。我们仍然使用 Toymodel 示例并创建一个名为 elastic_ddp.py 的文件。
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
torch.accelerator.set_device_index(int(os.environ["LOCAL_RANK"]))
acc = torch.accelerator.current_accelerator()
backend = torch.distributed.get_default_backend_for_device(acc)
dist.init_process_group(backend)
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.accelerator.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
print(f"Finished running basic DDP example on rank {rank}.")
if __name__ == "__main__":
demo_basic()
然后,可以在所有节点上运行 torch elastic/torchrun 命令来初始化上面创建的 DDP 作业。
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
在上面的示例中,我们在两个主机上运行 DDP 脚本,并且每个主机运行 8 个进程。也就是说,我们在 16 个 GPU 上运行此作业。请注意,$MASTER_ADDR 在所有节点上必须相同。
torchrun 将启动 8 个进程,并在每个进程上调用 elastic_ddp.py,但在节点上调用。然而,用户还需要应用集群管理工具(如 slurm)来实际在 2 个节点上运行此命令。
例如,在启用了 SLURM 的集群上,我们可以编写一个脚本来运行上述命令并将 MASTER_ADDR 设置为
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然后我们可以使用 SLURM 命令运行此脚本: srun --nodes=2 ./torchrun_script.sh。
这只是一个示例;您可以选择自己的集群调度工具来启动 torchrun 作业。
有关 Elastic run 的更多信息,请参阅 快速入门文档。