评价此页

DistributedDataParallel#

class torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, init_sync=True, process_group=None, bucket_cap_mb=None, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False, delay_all_reduce_named_params=None, param_to_hook_all_reduce=None, mixed_precision=None, device_mesh=None, skip_all_reduce_unused_params=False)[source]#

在模块级别实现基于 torch.distributed 的分布式数据并行。

此容器通过在每个模型副本之间同步梯度来实现数据并行。用于同步的设备通过输入的 process_group 指定,默认情况下为整个通信组。请注意,DistributedDataParallel 不会对输入进行分块或分片到参与的 GPU 上;用户有责任定义如何做到这一点,例如通过使用 DistributedSampler

另请参阅:基础知识使用 nn.parallel.DistributedDataParallel 而非 multiprocessing 或 nn.DataParallel。与 torch.nn.DataParallel 中的输入限制相同。

创建此类需要 torch.distributed 已被初始化,通过调用 torch.distributed.init_process_group()

DistributedDataParallel 在单节点多 GPU 数据并行训练中,已被证明比 torch.nn.DataParallel 快得多。

要在具有 N 个 GPU 的主机上使用 DistributedDataParallel,您应该启动 N 个进程,确保每个进程专门处理 0 到 N-1 的单个 GPU。这可以通过为每个进程设置 CUDA_VISIBLE_DEVICES 来实现,或者通过为 GPU 调用以下 API 来实现:

>>> torch.cuda.set_device(i)

或者为 加速器 调用统一 API:

>>> torch.accelerator.set_device_index(i)

其中 i 从 0 到 N-1。在每个进程中,您应该参考以下内容来构建此模块:

>>> if torch.accelerator.is_available():
>>>     device_type = torch.accelerator.current_accelerator().type
>>>     vendor_backend = torch.distributed.get_default_backend_for_device(device_type)
>>>
>>> torch.distributed.init_process_group(
>>>     backend=vendor_backend, world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

或者您可以使用最新的初始化 API:

>>> torch.distributed.init_process_group(device_id=i)

为了在每个节点上启动多个进程,您可以使用 torch.distributed.launchtorch.multiprocessing.spawn

注意

有关分布式训练所有功能的简要介绍,请参阅 PyTorch 分布式概述

注意

DistributedDataParallel 可以与 torch.distributed.optim.ZeroRedundancyOptimizer 结合使用,以减少每个副本的优化器状态内存占用。有关更多详细信息,请参阅 ZeroRedundancyOptimizer 教程

注意

nccl 后端是目前使用 GPU 时最快且强烈推荐的后端。这适用于单节点和多节点分布式训练。

注意

此模块还支持混合精度分布式训练。这意味着您的模型可以具有不同类型的参数,例如 fp16fp32 的混合类型,这些混合类型参数的梯度归约将正常工作。

注意

如果您在一个进程中使用 torch.save 来保存检查点模块,并在其他进程中使用 torch.load 来恢复它,请确保为每个进程正确配置了 map_location。如果没有 map_locationtorch.load 会将模块恢复到模块被保存的设备上。

注意

当一个模型在 M 个节点上以 batch=N 进行训练时,如果损失是跨批次中的实例求和(而非通常的平均),则梯度将是单节点以 batch=M*N 训练的同等模型梯度的 M 倍小(因为不同节点之间的梯度被平均了)。当您想要获得与本地训练对等体在数学上等效的训练过程时,您应该考虑这一点。但在大多数情况下,您可以将 DistributedDataParallel 包装的模型、DataParallel 包装的模型以及单 GPU 上的普通模型视为相同(例如,使用相同的学习率来获得等效的批次大小)。

注意

参数永远不会在进程之间广播。该模块会在梯度上执行 all-reduce 步骤,并假定它们将在所有进程中以相同的方式被优化器修改。在每次迭代中,缓冲区(例如 BatchNorm 统计量)会从 rank 0 进程中的模块广播到系统中的所有其他副本。

注意

如果您将 DistributedDataParallel分布式 RPC 框架 结合使用,您应该始终使用 torch.distributed.autograd.backward() 来计算梯度,并使用 torch.distributed.optim.DistributedOptimizer 来优化参数。

示例

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, target)
>>>     dist_autograd.backward(context_id, [loss])
>>>     dist_optim.step(context_id)

注意

DistributedDataParallel 目前对使用 torch.utils.checkpoint() 的梯度检查点支持有限。如果检查点使用 use_reentrant=False(推荐),DDP 将按预期工作,没有任何限制。然而,如果检查点使用 use_reentrant=True(默认值),并且模型中没有未使用的参数,并且每个层最多只检查点一次(确保您没有将 find_unused_parameters=True 传递给 DDP),DDP 将按预期工作。我们目前不支持检查点多次层或检查点模型中存在未使用的参数的情况。

注意

为了让非 DDP 模型加载 DDP 模型的 state dict,需要应用 consume_prefix_in_state_dict_if_present() 来剥离 DDP state dict 中的前缀“module.”,然后再加载。

警告

构造函数、前向方法以及输出(或此模块输出的函数)的微分是分布式同步点。在不同进程可能执行不同代码的情况下,请予以考虑。

警告

此模块假定所有参数在创建时都已在模型中注册。之后不应添加或删除参数。缓冲区也是如此。

警告

此模块假定每个分布式进程的模型中注册的所有参数顺序相同。该模块本身将按照模型注册参数的逆序进行梯度 allreduce。换句话说,用户有责任确保每个分布式进程拥有完全相同的模型,从而拥有完全相同的参数注册顺序。

警告

此模块允许使用非行主序连续步幅的参数。例如,您的模型可能包含一些 torch.memory_formattorch.contiguous_format,而其他参数的格式为 torch.channels_last。但是,不同进程中对应的参数必须具有相同的步幅。

警告

此模块不适用于 torch.autograd.grad()(即,它仅在梯度要累积到参数的 .grad 属性中时才有效)。

警告

如果您计划将此模块与 nccl 后端或 gloo 后端(使用 Infiniband)结合使用,并结合使用多工作进程的 DataLoader,请将多进程启动方法更改为 forkserver(仅限 Python 3)或 spawn。不幸的是,Gloo(使用 Infiniband)和 NCCL2 不是 fork 安全的,如果您不更改此设置,很可能会遇到死锁。

警告

您永远不应尝试在用 DistributedDataParallel 包装模型后更改模型的参数。因为,在用 DistributedDataParallel 包装模型时,DistributedDataParallel 的构造函数将在构造时为模型本身的所有参数注册额外的梯度归约函数。如果您之后更改了模型的参数,梯度归约函数将不再匹配正确的参数集。

警告

DistributedDataParallel分布式 RPC 框架 结合使用是实验性的,并可能发生更改。

参数
  • module (Module) – 要并行化的模块

  • device_ids (list of int or torch.device) –

    CUDA 设备。1) 对于单设备模块,device_ids 必须包含一个设备 ID,该 ID 代表该进程对应的输入模块所在的唯一 CUDA 设备。或者,device_ids 也可以是 None。2) 对于多设备模块和 CPU 模块,device_ids 必须是 None

    当两种情况下的 device_ids 均为 None 时,前向传递的输入数据和实际模块都必须放置在正确的设备上。(默认:对于单设备模块为 None

  • output_device (int or torch.device) – 单设备 CUDA 模块的输出设备位置。对于多设备模块和 CPU 模块,它必须是 None,并且模块本身决定输出位置。(默认:单设备模块为 device_ids[0]

  • broadcast_buffers (bool) – 在 forward 函数开始时同步(广播)模块缓冲区的标志。(默认:True

  • init_sync (bool) – 初始化时是否同步以验证参数形状并广播参数和缓冲区。警告:如果设置为 False,用户必须自行确保所有 rank 上的权重是相同的。(默认:True

  • process_group – 用于分布式数据 all-reduction 的进程组。如果为 None,则使用默认进程组,该进程组由 torch.distributed.init_process_group() 创建。(默认:None

  • bucket_cap_mbDistributedDataParallel 将参数分桶到多个桶中,以便每个桶的梯度归约可以与后向计算重叠。bucket_cap_mb 控制以兆字节 (MiB) 为单位的桶大小。如果为 None,则使用默认大小 25 MiB。(默认:None

  • find_unused_parameters (bool) – 从包装模块的 forward 函数返回值中包含的所有张量开始遍历 autograd 图。未作为此图一部分接收梯度的参数将被预先标记为准备好进行归约。此外,可能已在包装模块的 forward 函数中使用但未包含在损失计算中因此也不会接收梯度的参数将被预先标记为准备好进行归约。(默认:False

  • check_reduction – 此参数已弃用。

  • gradient_as_bucket_view (bool) – 设置为 True 时,梯度将是视图,指向 allreduce 通信桶的不同偏移量。这可以减少峰值内存使用量,节省的内存大小等于总梯度大小。此外,它避免了在梯度和 allreduce 通信桶之间复制的开销。当梯度是视图时,不能在梯度上调用 detach_()。如果遇到此类错误,请参考 torch/optim/optimizer.py 中的 zero_grad() 函数作为解决方案。请注意,梯度将在第一次迭代后成为视图,因此峰值内存节省应在第一次迭代后进行检查。

  • static_graph (bool) –

    设置为 True 时,DDP 知道训练图是静态的。静态图意味着 1) 使用和未使用的参数集在整个训练循环中不会改变;在这种情况下,用户设置 find_unused_parameters = True 与否无关紧要。2) 图的训练方式在整个训练循环中不会改变(即,没有依赖于迭代的控制流)。当 static_graph 设置为 True 时,DDP 将支持过去无法支持的场景:1) 可重入后向传播。2) 激活检查点多次。3) 模型具有未使用的参数时进行激活检查点。4) 模型参数存在于前向函数之外。5) 当存在未使用的参数时,可能提高性能,因为当 static_graph 设置为 True 时,DDP 不会在每次迭代中搜索图来检测未使用的参数。要检查您是否可以将 static_graph 设置为 True,一种方法是检查您之前的模型训练结束时的 ddp 日志数据,如果 ddp_logging_data.get("can_set_static_graph") == True,那么您很可能也可以设置 static_graph = True

    示例:
    >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
    >>> # Training loop
    >>> ...
    >>> ddp_logging_data = model_DDP._get_ddp_logging_data()
    >>> static_graph = ddp_logging_data.get("can_set_static_graph")
    

  • delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter) – 一个命名参数列表,当 param_to_hook_all_reduce 中指定的参数的梯度就绪时,它们的 all reduce 将被延迟。DDP 缩减器将忽略此参数中指定的命名参数,因此 DDP 的其他参数不适用于它们。

  • param_to_hook_all_reduce (torch.nn.Parameter) – 一个参数,用于挂钩 delay_all_reduce_named_params 中指定的参数的延迟 all reduce。

  • skip_all_reduce_unused_params – 设置为 True 时,DDP 将跳过未使用的参数的归约。这要求未使用的参数在整个训练过程中在所有 rank 上保持不变。如果此条件不满足,可能会导致不同步并导致训练挂起。

变量

module (Module) – 要并行化的模块。

示例

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model)
join(divide_by_initial_world_size=True, enable=True, throw_on_early_termination=False)[source]#

用于在 DDP 中处理进程间输入不均的训练的上下文管理器。

此上下文管理器将跟踪已加入的 DDP 进程,并通过插入集体通信操作来“镜像”前向和后向传播,以匹配已加入的 DDP 进程创建的操作。这将确保每个集体调用都有一个已加入的 DDP 进程的相应调用,从而防止在处理进程间输入不均的训练时出现的挂起或错误。或者,如果指定 throw_on_early_termination 标志为 True,所有训练器将在一个 rank 用完输入时抛出错误,从而允许根据应用程序逻辑捕获和处理这些错误。

一旦所有 DDP 进程都已加入,上下文管理器将把最后一个加入进程的模型广播到所有进程,以确保模型在所有进程中都相同(这由 DDP 保证)。

要使用此功能来实现在进程间输入不均的训练,只需将此上下文管理器包装在您的训练循环周围即可。无需对模型或数据加载进行进一步修改。

警告

如果模型或训练循环被此上下文管理器包装,并且具有额外的分布式集体操作(例如模型前向传播中的 SyncBatchNorm),则必须启用 throw_on_early_termination 标志。这是因为此上下文管理器不知道非 DDP 集体通信。此标志将导致所有 rank 在任何一个 rank 用完输入时抛出异常,从而允许跨所有 rank 捕获和恢复这些错误。

参数
  • divide_by_initial_world_size (bool) – 如果为 True,则将梯度除以 DDP 启动时的初始 world_size。如果为 False,则将梯度除以有效世界大小(即尚未用完输入的 rank 数量),这会在 allreduce 期间完成。将 divide_by_initial_world_size=True 设置为确保每个输入样本(包括不均的输入)在对全局梯度的贡献方面具有相等的权重。这是通过即使在遇到不均的输入时也始终将梯度除以初始 world_size 来实现的。如果您将其设置为 False,我们将梯度除以剩余的节点数。这确保了与使用较小的 world_size 进行训练的对等性,尽管这也意味着不均的输入对全局梯度的贡献会更大。通常,您希望将其设置为 True,用于训练作业的最后几个输入不均的情况。在极端情况下,当输入数量存在很大差异时,将此设置为 False 可能会提供更好的结果。

  • enable (bool) – 是否启用不均输入检测。在您知道输入在参与进程之间均匀的情况下,传入 enable=False 来禁用。默认值为 True

  • throw_on_early_termination (bool) – 当至少有一个 rank 用完输入时,是抛出错误还是继续训练。如果为 True,将在第一个 rank 到达数据末尾时抛出。如果为 False,将继续以较小的有效世界大小进行训练,直到所有 rank 都加入。请注意,如果指定了此标志,则将忽略 divide_by_initial_world_size 标志。默认值为 False

示例

>>> import torch
>>> import torch.distributed as dist
>>> import os
>>> import torch.multiprocessing as mp
>>> import torch.nn as nn
>>> # On each spawned worker
>>> def worker(rank):
>>>     dist.init_process_group("nccl", rank=rank, world_size=2)
>>>     torch.cuda.set_device(rank)
>>>     model = nn.Linear(1, 1, bias=False).to(rank)
>>>     model = torch.nn.parallel.DistributedDataParallel(
>>>         model, device_ids=[rank], output_device=rank
>>>     )
>>>     # Rank 1 gets one more input than rank 0.
>>>     inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>     with model.join():
>>>         for _ in range(5):
>>>             for inp in inputs:
>>>                 loss = model(inp).sum()
>>>                 loss.backward()
>>>     # Without the join() API, the below synchronization will hang
>>>     # blocking for rank 1's allreduce to complete.
>>>     torch.cuda.synchronize(device=rank)
join_hook(**kwargs)[source]#

DDP join hook 通过在前向和后向传播中镜像通信来实现对不均输入的训练。

参数

kwargs (dict) – 一个包含任何关键字参数的 dict,用于在运行时修改 join hook 的行为;共享同一 join 上下文管理器的所有 Joinable 实例都会收到相同的 kwargs 值。

此 hook 支持以下关键字参数:
divide_by_initial_world_size (bool, optional)

如果为 True,则将梯度除以 DDP 启动时的初始 world size。如果为 False,则将梯度除以有效 world size(即未加入进程的数量),这意味着不均的输入对全局梯度的贡献更大。通常,如果差异程度较小,应将其设置为 True,但在极端情况下可以设置为 False 以获得可能更好的结果。默认为 True

no_sync()[source]#

禁用 DDP 进程之间梯度同步的上下文管理器。

在此上下文中,梯度将累积在模块变量上,稍后将在退出上下文的第一个前向-后向传播中同步。

示例

>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>     for input in inputs:
>>>         ddp(input).backward()  # no synchronization, accumulate grads
>>> ddp(another_input).backward()  # synchronize grads

警告

前向传播应包含在上下文管理器内,否则梯度仍将被同步。

register_comm_hook(state, hook)[source]#

为用户定义的 DDP 跨多个工作进程的梯度聚合注册通信 hook。

此 hook 对于研究人员尝试新想法非常有用。例如,此 hook 可用于实现 GossipGrad 和梯度压缩等算法,这些算法在运行 Distributed DataParallel 训练时涉及不同的参数同步通信策略。

参数
  • state (object) –

    传递给 hook,用于在训练过程中维护任何状态信息。示例包括梯度压缩中的错误反馈,GossipGrad 中下一个要通信的对等方等。

    它由每个工作进程本地存储,并由工作进程上的所有梯度张量共享。

  • hook (Callable) –

    可调用签名如下:hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]

    当 bucket 准备好时,将调用此函数。hook 可以执行所需的任何处理,并返回一个 Future 来指示异步工作的完成(例如:allreduce)。如果 hook 不执行任何通信,它仍然必须返回一个已完成的 Future。Future 应包含 grad bucket 张量的新值。一旦 bucket 准备好,c10d 缩减器将调用此 hook,并使用从 Future 返回的张量将 grad 复制到各个参数。请注意,Future 的返回类型必须是单个张量。

    我们还提供了一个名为 get_future 的 API 来检索与 c10d.ProcessGroup.Work 完成相关的 Future。get_future 目前支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但不包括点对点操作(send/recv)。

警告

Grad bucket 的张量不会按 world_size 进行预除法。用户负责在进行 allreduce 等操作时进行 world_size 的除法。

警告

DDP 通信 hook 只能注册一次,并且应在调用 backward 之前注册。

警告

hook 返回的 Future 对象应包含一个与 grad bucket 中的张量形状相同的单个张量。

警告

get_future API 支持 NCCL,以及部分 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回一个 torch.futures.Future

示例:

以下是一个 noop hook 的示例,它返回相同的张量。

>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     fut = torch.futures.Future()
>>>     fut.set_result(bucket.buffer())
>>>     return fut
>>> ddp.register_comm_hook(state=None, hook=noop)
示例:

以下是一个 Parallel SGD 算法的示例,其中梯度在 allreduce 之前进行编码,然后在 allreduce 之后进行解码。

>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     encoded_tensor = encode(bucket.buffer())  # encode gradients
>>>     fut = torch.distributed.all_reduce(encoded_tensor).get_future()
>>>     # Define the then callback to decode.
>>>     def decode(fut):
>>>         decoded_tensor = decode(fut.value()[0])  # decode gradients
>>>         return decoded_tensor
>>>     return fut.then(decode)
>>> ddp.register_comm_hook(state=None, hook=encode_and_decode)