评价此页

分布式通信包 - torch.distributed#

创建于: 2017年7月12日 | 最后更新于: 2026年1月8日

注意

有关分布式训练所有功能的简要介绍,请参阅 PyTorch分布式概述

后端#

torch.distributed 支持四种内置后端,每种后端都有不同的功能。下表显示了每个后端对 CPU 或 GPU 可用的函数。对于 NCCL,GPU 指的是 CUDA GPU;对于 XCCL,GPU 指的是 XPU GPU。

仅当用于构建 PyTorch 的实现支持 CUDA 时,MPI 才支持 CUDA。

后端

gloo

mpi

nccl

xccl

设备

CPU

GPU

CPU

GPU

CPU

GPU

CPU

GPU

send

?

recv

?

broadcast

?

all_reduce

?

reduce

?

all_gather

?

gather

?

scatter

?

reduce_scatter

all_to_all

?

barrier

?

PyTorch 自带的后端#

PyTorch 分布式包支持 Linux (稳定版)、macOS (稳定版) 和 Windows (原型版)。默认情况下,对于 Linux,Gloo 和 NCCL 后端将在 PyTorch 分布式中构建并包含 (NCCL 仅在与 CUDA 构建时包含)。MPI 是一个可选后端,只有在从源代码构建 PyTorch 时才能包含 (例如,在具有已安装 MPI 的主机上构建 PyTorch)。

注意

截至 PyTorch v1.8,Windows 支持所有集体通信后端,但不包括 NCCL。如果 init_method 参数的 init_process_group() 指向一个文件,则该文件必须遵循以下模式

  • 本地文件系统, init_method="file:///d:/tmp/some_file"

  • 共享文件系统, init_method="file://////{machine_name}/{share_folder_name}/some_file"

与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。

选择哪个后端?#

过去,我们经常被问到:“应该使用哪个后端?”。

  • 经验法则

    • 使用 NCCL 后端进行分布式训练,搭配 CUDA **GPU**。

    • 使用 XCCL 后端进行分布式训练,搭配 XPU **GPU**。

    • 使用 Gloo 后端进行分布式训练,搭配 **CPU**。

  • 具有 InfiniBand 互连的 GPU 主机

    • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。

  • 具有以太网互连的 GPU 主机

    • 使用 NCCL,因为它目前提供最佳的分布式 GPU 训练性能,尤其适用于多进程单节点或多节点分布式训练。如果您在使用 NCCL 时遇到任何问题,可以使用 Gloo 作为备用选项。(请注意,Gloo 目前在 GPU 上的运行速度比 NCCL 慢)。

  • 具有 InfiniBand 互连的 CPU 主机

    • 如果您的 InfiniBand 已启用 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在未来的版本中为 Gloo 添加 InfiniBand 支持。

  • 具有以太网互连的 CPU 主机

    • 使用 Gloo,除非您有特定理由使用 MPI。

常用环境变量#

选择要使用的网络接口#

默认情况下,NCCL 和 Gloo 后端都会尝试查找正确的网络接口。如果自动检测到的接口不正确,您可以使用以下环境变量覆盖它 (适用于相应后端)

  • NCCL_SOCKET_IFNAME,例如 export NCCL_SOCKET_IFNAME=eth0

  • GLOO_SOCKET_IFNAME,例如 export GLOO_SOCKET_IFNAME=eth0

如果您使用 Gloo 后端,可以通过逗号分隔来指定多个接口,例如: export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将在这些接口之间以轮询方式分派操作。所有进程都必须在此变量中指定相同数量的接口。

其他 NCCL 环境变量#

调试 - 如果 NCCL 失败,您可以设置 NCCL_DEBUG=INFO 来打印明确的警告消息以及 NCCL 的基本初始化信息。

您还可以使用 NCCL_DEBUG_SUBSYS 来获取 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 将打印集体调用的日志,这在调试挂起时可能很有帮助,特别是那些由集体类型或消息大小不匹配引起的挂起。如果拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH 来检查详细的检测结果并保存以备将来寻求 NCCL 团队的帮助将很有用。

性能调优 - NCCL 根据其拓扑检测自动进行调优,以节省用户的调优工作。在某些基于套接字的系统上,用户仍然可以尝试调优 NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD 来增加套接字网络带宽。这两个环境变量已经由 NCCL 为某些云提供商 (如 AWS 或 GCP) 预先调优。

有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 官方文档

您可以使用 torch.distributed.ProcessGroupNCCL.NCCLConfigtorch.distributed.ProcessGroupNCCL.Options 进一步调优 NCCL 通信器。在解释器中使用 help (例如 help(torch.distributed.ProcessGroupNCCL.NCCLConfig)) 来了解更多信息。

Copy Engine Collectives#

注意

Copy Engine Collectives 需要 NCCL 2.28 或更高版本,以及具有点对点 (P2P) 访问权限的 GPU。

Copy Engine (CE) Collectives 是 NCCL 集体操作的优化,它将数据移动卸载到 GPU 的复制引擎 (DMA 引擎),而不是使用 CUDA 流多处理器 (SM)。这使得 SM 可以用于计算工作,从而在分布式训练期间更好地重叠通信和计算。

要使用 CE Collectives,您需要

  1. 使用零-CTA 策略配置 NCCL 进程组

  2. 使用 NCCL 后端设置对称内存

  3. 使用对称内存分配张量

  4. 通过 rendezvous 注册对称内存中的张量

设置完成后,像 all_gather_into_tensor()all_to_all_single() 这样的标准集体函数在操作对称内存张量时会自动使用复制引擎。

示例

import torch
import torch.distributed as dist
import torch.distributed._symmetric_memory as symm_mem

# Initialize process group with zero-CTA policy for CE collectives
opts = dist.ProcessGroupNCCL.Options()
opts.config.cta_policy = dist.ProcessGroupNCCL.NCCL_CTA_POLICY_ZERO
device = torch.device("cuda", rank)
dist.init_process_group(backend="nccl", pg_options=opts, device_id=device)

# Set up symmetric memory with NCCL backend
symm_mem.set_backend("NCCL")
group_name = dist.group.WORLD.group_name
symm_mem.enable_symm_mem_for_group(group_name)

# Allocate tensors using symmetric memory
numel = 1024 * 1024
inp = symm_mem.empty(numel, device=device)
out = symm_mem.empty(numel * world_size, device=device)

# Register tensors for symmetric memory operations
symm_mem.rendezvous(inp, group=group_name)
symm_mem.rendezvous(out, group=group_name)

# Perform collective operation using copy engines
# This now runs on DMA engines instead of SMs
work = dist.all_gather_into_tensor(out, inp, async_op=True)
work.wait()

优点

  • SM 卸载:通信在复制引擎上运行,使 SM 可用于计算

  • 更好的重叠:实现更高效的计算/通信重叠

  • 透明 API:使用相同的集体 API,只是使用对称内存张量

要求和限制

  • NCCL 版本 2.28 或更高版本

  • GPU 必须启用点对点 (P2P) 访问

  • 张量必须使用 torch.distributed._symmetric_memory() 分配并通过 rendezvous 注册

  • NCCL 进程组必须配置为 NCCL_CTA_POLICY_ZERO 或环境变量 NCCL_CTA_POLICY 设置为 2

  • 从 NCCL 2.28 开始,CE Collectives 不能与默认流一起运行,因此您需要使用 async_op=True 标志来激活 ProcessGroupNCCL 的内部流,或者自己创建一个侧流。

基础知识#

torch.distributed 包为跨越多台机器上运行的多个计算节点提供 PyTorch 支持和通信原语。 torch.nn.parallel.DistributedDataParallel() 类在此功能的基础上构建,作为任何 PyTorch 模型的包装器,提供同步分布式训练。这与 多进程包 - torch.multiprocessingtorch.nn.DataParallel() 提供的并行类型不同,因为它支持多台网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。

在单机同步情况下,torch.distributedtorch.nn.parallel.DistributedDataParallel() 包装器可能仍比其他数据并行方法具有优势,包括 torch.nn.DataParallel()

  • 每个进程都维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经跨进程收集并平均,因此对每个进程都相同,但这表示不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。

  • 每个进程包含一个独立的 Python 解释器,消除了来自驱动单个 Python 进程的多个执行线程、模型副本或 GPU 的额外解释器开销和“GIL 争用”。这对于大量使用 Python 运行时,包括使用循环层或许多小组件的模型尤其重要。

初始化#

在调用任何其他方法之前,必须使用 torch.distributed.init_process_group()torch.distributed.device_mesh.init_device_mesh() 函数初始化包。两者都会阻塞直到所有进程都加入。

警告

初始化不是线程安全的。进程组创建应从单个线程执行,以防止跨 rank 的不一致“UUID”分配,并防止初始化期间可能导致挂起的竞争。

torch.distributed.is_available()[源代码]#

返回分布式包是否可用。

否则,torch.distributed 不会暴露任何其他 API。目前,torch.distributed 在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,请设置 USE_DISTRIBUTED=1 来启用它。目前,Linux 和 Windows 的默认值是 USE_DISTRIBUTED=1,macOS 的默认值是 USE_DISTRIBUTED=0

返回类型:

布尔值

torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None, _ranks=None)[源代码]#

初始化默认分布式进程组。

这还将初始化分布式包。

初始化进程组主要有两种方式
  1. 显式指定 storerankworld_size

  2. 指定 init_method (一个 URL 字符串),它指示在哪里/如何发现对等节点。可选地指定 rankworld_size,或者将所有必需的参数编码在 URL 中并省略它们。

如果两者均未指定,则假定 init_method 为 “env://”。

参数:
  • backend (strBackend, 可选) – 要使用的后端。根据构建时的配置,有效值包括 mpi, gloo, nccl, ucc, xccl 或由第三方插件注册的后端。自 2.6 版本起,如果未提供 backend,c10d 将使用为 device_id 关键字参数指示的设备类型注册的后端 (如果提供了)。目前已知的默认注册是:cudancclcpuglooxpuxccl。如果未提供 backenddevice_id,c10d 将检测运行时机器上的加速器并使用为该检测到的加速器 (或 cpu) 注册的后端。此字段可以作为小写字符串 (例如,"gloo") 提供,也可以通过 Backend 属性 (例如 Backend.GLOO) 访问。如果使用 nccl 后端的多进程/多节点,每个进程必须对它使用的每个 GPU 具有独占访问权,因为进程之间共享 GPU 可能导致死锁或 NCCL 错误使用。ucc 后端是实验性的。设备默认后端可以使用 get_default_backend_for_device() 查询。

  • init_method (str, 可选) – 用于初始化进程组的 URL。如果未指定 init_methodstore,则默认为 “env://”。与 store 互斥。

  • world_size (int, 可选) – 作业中参与的进程数。如果指定了 store,则为必需。

  • rank (int, 可选) – 当前进程的 rank (它应该是一个介于 0 和 world_size-1 之间的数字)。如果指定了 store,则为必需。

  • store (Store, 可选) – 所有工作进程都可以访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。

  • timeout (timedelta, 可选) – 对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是集合操作将被异步中止且进程将崩溃的最长时间。这样做是因为 CUDA 执行是异步的,继续执行用户代码可能不安全,因为失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行。当设置了 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。

  • group_name (str, 可选, 已弃用) – 组名。此参数将被忽略

  • pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传递哪些附加选项。目前,我们支持的唯一选项是 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便 NCCL 后端在有计算内核等待时选择高优先级 CUDA 流。有关配置 NCCL 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • device_id (torch.device | int, 可选) – 此进程将工作的单个、特定设备,允许后端进行特定优化。目前这有两个效果,仅在 NCCL 下:通信器立即形成 (调用 ncclCommInit* 而不是正常的懒惰调用),并且子组将在可能时使用 ncclCommSplit 来避免不必要的组创建开销。如果您想尽早了解 NCCL 初始化错误,您也可以使用此字段。如果提供了 int,则 API 假定在编译时使用该加速器的类型。

  • _ranks (list[int] | None) – 进程组中的 rank。如果提供,进程组名称将是组中所有 rank 的哈希值。

注意

要启用 backend == Backend.MPI,PyTorch 需要从支持 MPI 的系统上进行源代码构建。

注意

对多个后端的支持是实验性的。目前,当未指定后端时,将创建 gloonccl 后端。gloo 后端将用于 CPU 张量的集体操作,而 nccl 后端将用于 CUDA 张量的集体操作。可以通过传递格式为“<device_type>:<backend_name>,<device_type>:<backend_name>”的字符串来指定自定义后端,例如“cpu:gloo,cuda:custom_backend”。

torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[源代码]#

基于 device_typemesh_shapemesh_dim_names 参数初始化 DeviceMesh

这会创建一个具有 n 维数组布局的 DeviceMesh,其中 nmesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度都将标记为 mesh_dim_names[i]

注意

init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序将在集群的所有进程/rank 上运行。确保 mesh_shape (描述设备布局的 nD 数组的维度) 在所有 rank 上都相同。不一致的 mesh_shape 可能导致挂起。

注意

如果找不到进程组,init_device_mesh 将在后台初始化分布式通信所需的进程组。

参数:
  • device_type (str) – Mesh 的设备类型。目前支持:“cpu”、“cuda/cuda-like”、“xpu”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。

  • mesh_shape (Tuple[int]) – 一个元组,定义描述设备布局的多维数组的维度。

  • mesh_dim_names (tuple[str, ...], 可选) – 一个元组,包含要分配给描述设备布局的多维数组每个维度的 mesh 维度名称。其长度必须与 mesh_shape 的长度匹配。 mesh_dim_names 中的每个字符串都必须是唯一的。

  • backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 对将为每个 mesh 维度创建的某些或所有 ProcessGroups 的覆盖。每个键可以是维度的索引或其名称 (如果提供了 mesh_dim_names)。每个值可以是包含后端名称及其选项的元组,或仅包含其中一个组件 (在这种情况下,另一个将设置为其默认值)。

返回:

一个 DeviceMesh 对象,表示设备布局。

返回类型:

DeviceMesh

示例

>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed.is_initialized()[源代码]#

检查默认进程组是否已初始化。

返回类型:

布尔值

torch.distributed.is_mpi_available()[源代码]#

检查 MPI 后端是否可用。

返回类型:

布尔值

torch.distributed.is_nccl_available()[源代码]#

检查 NCCL 后端是否可用。

返回类型:

布尔值

torch.distributed.is_gloo_available()[源代码]#

检查 Gloo 后端是否可用。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_xccl_available()[源代码]#

检查 XCCL 后端是否可用。

返回类型:

布尔值

torch.distributed.distributed_c10d.batch_isend_irecv(p2p_op_list)[源代码]#

异步发送或接收张量批次并返回请求列表。

处理 p2p_op_list 中的每个操作,并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。

参数:

p2p_op_list (list[P2POp]) – 点对点操作列表(每个操作符的类型是 torch.distributed.P2POp)。列表中 isend/irecv 的顺序很重要,它需要与远程端的相应 isend/irecv 匹配。

返回:

通过调用 op_list 中的相应操作返回的分布式请求对象列表。

返回类型:

list[Work]

示例

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size)
>>> recv_op = dist.P2POp(
...     dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size
... )
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则将导致意外的挂起问题。

此外,如果此 API 是传递给 dist.P2POpgroup 中的第一个集合调用,则 group 的所有 rank 必须参与此 API 调用;否则,行为是未定义的。如果此 API 调用不是 group 中的第一个集合调用,则允许仅涉及 group 的部分 rank 的批量 P2P 操作。

torch.distributed.distributed_c10d.destroy_process_group(group=None)[source]#

销毁给定的进程组,并反初始化分布式包。

参数:

group (ProcessGroup, optional) – 要销毁的进程组,如果给定 group.WORLD,则将销毁所有进程组,包括默认进程组。

torch.distributed.distributed_c10d.is_backend_available(backend)[source]#

检查后端可用性。

检查给定的后端是否可用,并通过函数 Backend.register_backend 支持内置后端或第三方后端。

参数:

backend (str) – 后端名称。

返回:

如果后端可用,则返回 true,否则返回 false。

返回类型:

布尔值

torch.distributed.distributed_c10d.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]#

异步接收张量。

警告

tag 参数不适用于 NCCL 后端。

与阻塞的 recv 不同,irecv 允许 src == dst rank,即自接收。

参数:
  • tensor (Tensor) – 用于填充接收数据的张量。

  • src (int, optional) – 全局进程组中的源 rank(忽略 group 参数)。如果未指定,则从任何进程接收。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于将 recv 与远程 send 匹配的标签。

  • group_src (int, optional) – group 中的目标 rank。指定 srcgroup_src 是无效的。

返回:

分布式请求对象。如果不是组的一部分,则为 None。

返回类型:

Work | None

torch.distributed.distributed_c10d.is_gloo_available()[source]#

检查 Gloo 后端是否可用。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_initialized()[source]#

检查默认进程组是否已初始化。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_mpi_available()[source]#

检查 MPI 后端是否可用。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_nccl_available()[source]#

检查 NCCL 后端是否可用。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_torchelastic_launched()[source]#

检查此进程是否使用 torch.distributed.elastic(也称为 torchelastic)启动。

环境变量 TORCHELASTIC_RUN_ID 的存在被用作代理来确定当前进程是否是通过 torchelastic 启动的。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,它始终是一个非空值,表示用于对等发现的作业 id。

返回类型:

布尔值

torch.distributed.distributed_c10d.is_ucc_available()[source]#

检查 UCC 后端是否可用。

返回类型:

布尔值

torch.distributed.is_torchelastic_launched()[source]#

检查此进程是否使用 torch.distributed.elastic(也称为 torchelastic)启动。

环境变量 TORCHELASTIC_RUN_ID 的存在被用作代理来确定当前进程是否是通过 torchelastic 启动的。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,它始终是一个非空值,表示用于对等发现的作业 id。

返回类型:

布尔值

torch.distributed.get_default_backend_for_device(device)[source]#

返回给定设备的默认后端。

参数:

device (Union[str, torch.device]) – 要获取默认后端的设备。

返回:

给定设备的默认后端,以小写字符串形式返回。

返回类型:

str


目前支持三种初始化方法。

TCP 初始化#

有两种使用 TCP 初始化。两者都需要一个所有进程都可以访问的网络地址和一个期望的 world_size。第一种方法需要指定属于 rank 0 进程的地址。此初始化方法要求所有进程都已手动指定 rank。

请注意,最新的分布式包不再支持多播地址。group_name 也已弃用。

import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

共享文件系统初始化#

另一种初始化方法利用了组内所有机器都可以访问和可见的文件系统,以及期望的 world_size。URL 应以 file:// 开头,并包含共享文件系统上一个不存在的文件(在现有目录中)的路径。如果文件不存在,文件系统初始化将自动创建该文件,但不会删除该文件。因此,您有责任确保在下一次使用相同文件路径/名称调用 init_process_group() 之前清理该文件。

请注意,最新的分布式包不再支持自动 rank 分配,并且 group_name 也已弃用。

警告

此方法假定文件系统支持使用 fcntl 进行锁定 - 大多数本地系统和 NFS 都支持它。

警告

此方法将始终创建文件,并在程序结束时尽力清理和删除该文件。换句话说,每次使用文件初始化方法进行初始化都需要一个全新的空文件,以便初始化成功。如果使用了前一次初始化(碰巧未被清理)的同一个文件,这是意外行为,通常会导致死锁和故障。因此,即使此方法会尽力清理文件,如果自动删除未能成功,您也有责任确保在训练结束时删除该文件,以防止下次使用同一文件。如果您计划在同一文件名上多次调用 init_process_group(),这一点尤其重要。换句话说,如果文件未被删除/清理,并且您在此文件上再次调用 init_process_group(),则预期会出现故障。这里的经验法则是,确保每次调用 init_process_group() 时文件都不存在或为空。

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

环境变量初始化#

此方法将从环境变量中读取配置,允许完全自定义信息的获取方式。需要设置的变量是:

  • MASTER_PORT - 必需;必须是 rank 0 机器上的一个空闲端口。

  • MASTER_ADDR - 必需(rank 0 除外);rank 0 节点的地址。

  • WORLD_SIZE - 必需;可以在此处设置,或在 init 函数调用中设置。

  • RANK - 必需;可以在此处设置,或在 init 函数调用中设置。

rank 0 的机器将用于设置所有连接。

这是默认方法,意味着不需要指定 init_method(或者可以是 env://)。

提高初始化时间#

  • TORCH_GLOO_LAZY_INIT - 延迟建立连接,而不是使用完整的网状连接,这可以大大提高非 all2all 操作的初始化时间。

初始化后#

一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()

class torch.distributed.Backend(name)[source]#

后端的枚举类。

可用后端:GLOO、NCCL、UCC、MPI、XCCL 以及其他已注册的后端。

此类的值是小写字符串,例如 "gloo"。它们可以作为属性访问,例如 Backend.NCCL

此类可以直接调用以解析字符串,例如 Backend(backend_str) 将检查 backend_str 是否有效,如果有效则返回解析后的小写字符串。它也接受大写字符串,例如 Backend("GLOO") 返回 "gloo"

注意

条目 Backend.UNDEFINED 存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假定它的存在。

classmethod register_backend(name, func, extended_api=False, devices=None)[source]#

使用给定的名称和实例化函数注册一个新后端。

此类方法由第三方 ProcessGroup 扩展用于注册新后端。

参数:
  • name (str) – ProcessGroup 扩展的后端名称。它应与 init_process_group() 中的名称匹配。

  • func (function) – 实例化后端的函数句柄。该函数应在后端扩展中实现,并接受四个参数,包括 storerankworld_sizetimeout

  • extended_api (bool, optional) – 后端是否支持扩展参数结构。默认为 False。如果设置为 True,后端将获得 c10d::DistributedBackendOptions 的实例,以及由后端实现定义的进程组选项对象。

  • device (str or list of str, optional) – 此后端支持的设备类型,例如 "cpu"、"cuda" 等。如果为 None,则假定支持 "cpu" 和 "cuda"。

注意

对第三方后端的此支持是实验性的,可能会发生变化。

torch.distributed.get_backend(group=None)[source]#

返回给定进程组的后端。

参数:

group (ProcessGroup, optional) – 要操作的进程组。默认是通用的主进程组。如果指定了另一个特定组,调用进程必须是 group 的一部分。

返回:

给定进程组的后端,以小写字符串形式返回。

返回类型:

后端

torch.distributed.get_rank(group=None)[source]#

返回当前进程在提供的 group 中的 rank,否则返回默认值。

Rank 是在分布式进程组中分配给每个进程的唯一标识符。它们始终是从 0 到 world_size 的连续整数。

参数:

group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

返回:

进程组的 rank,如果不是组的一部分,则为 -1。

返回类型:

int

torch.distributed.get_world_size(group=None)[source]#

返回当前进程组中的进程数。

参数:

group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

返回:

进程组的世界大小,如果不是组的一部分,则为 -1。

返回类型:

int

关闭#

在退出时通过调用 destroy_process_group() 来清理资源非常重要。

最简单的遵循模式是在训练脚本中不再需要通信时(通常在 main() 的末尾附近),调用 destroy_process_group() 并将 group 参数设置为默认值 None,以销毁每个进程组和后端。该调用应每 trainer-process 调用一次,而不是在外层 process-launcher 级别调用。

如果在超时时间内,并非所有 rank 都在 destroy_process_group() 中为 pg 调用了该函数,尤其是在应用程序中有多个进程组的情况下(例如,用于 N-D 并行),则可能在退出时发生挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,这必须集体调用,但如果由 Python 的 GC 调用 ProcessGroupNCCL 析构函数的顺序是不确定的。调用 destroy_process_group() 可以帮助确保 ncclCommAbort 在 rank 之间以一致的顺序调用,并避免在 ProcessGroupNCCL 的析构函数中调用 ncclCommAbort。

重新初始化#

destroy_process_group 也可以用于销毁单个进程组。一种用例可能是容错训练,其中进程组可以在运行时销毁然后重新初始化。在这种情况下,在调用 destroy 之后和在后续初始化之前,使用除 torch.distributed 外部的某种方式同步 trainer 进程至关重要。由于实现这种同步的困难,此行为当前不受支持/未经验证,并被视为已知问题。如果您有此用例并且它阻碍了您,请提交一个 github issue 或 RFC。


#

默认情况下,集合操作在默认组(也称为 world)上进行,并且要求所有进程进入分布式函数调用。但是,一些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。可以使用 new_group() 函数来创建新组,其中包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数传递给所有集合(集合是用于在某些众所周知的编程模式中交换信息的分布式函数)。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]#

创建一个新的分布式组。

此函数要求主组中的所有进程(即,所有参与分布式作业的进程)都进入此函数,即使它们不是组的成员。此外,组应该在所有进程中以相同的顺序创建。

警告

安全的并发使用:当使用多个进程组和 NCCL 后端时,用户必须确保跨 rank 的集合体执行顺序全局一致。

如果进程内的多个线程发出集合体,则需要显式同步以确保一致的顺序。

当使用 torch.distributed 通信 API 的异步变体时,会返回一个 work 对象,并且通信内核会被排队到单独的 CUDA 流中,从而允许通信和计算的重叠。一旦在一个进程组上发出了一个或多个异步操作,就必须在调用 work.wait() 后才能使用另一个进程组,通过与其它 cuda 流同步。

有关更多详细信息,请参阅并发使用多个 NCCL 通信器 <https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently>

参数:
  • ranks (list[int]) – 成员的 ranks 列表。如果为 None,则设置为所有 ranks。默认为 None

  • timeout (timedelta, optional) – 详细信息和默认值请参阅 init_process_group

  • backend (str or Backend, optional) – 要使用的后端。根据构建时的配置,有效值为 gloonccl。默认为与全局组相同的后端。此字段应作为小写字符串(例如,"gloo")给出,也可以通过 Backend 属性(例如,Backend.GLOO)访问。如果传入 None,则使用与默认进程组对应的后端。默认为 None

  • pg_options (ProcessGroupOptions, optional) – 进程组选项,指定在构造特定进程组时需要传入哪些附加选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以选择高优先级 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional):在进程组创建结束时执行组本地屏障。这与非成员 ranks 不需要调用 API 且不加入屏障不同。

  • group_desc (str, optional) – 描述进程组的字符串。

  • device_id (torch.device, optional) – 一个单一的、特定的设备,用于“绑定”此进程。如果提供了此字段,new_group 调用将尝试立即为该设备初始化通信后端。

返回:

分布式组的句柄,可以传递给集合调用,或者如果 rank 不属于 ranks,则传递给 GroupMember.NON_GROUP_MEMBER。

注意:use_local_synchronization 与 MPI 不兼容。

注意:虽然对于大型集群和小型进程组,use_local_synchronization=True 可能速度显著更快,但必须小心,因为它会改变集群行为,因为非成员 ranks 不会加入组屏障()。

注意:use_local_synchronization=True 在每个 rank 创建多个重叠进程组时可能导致死锁。为避免这种情况,请确保所有 ranks 都遵循相同的全局创建顺序。

torch.distributed.distributed_c10d.shrink_group(ranks_to_exclude, group=None, shrink_flags=0, pg_options=None)[source]#

通过排除指定的 ranks 来缩小进程组。

创建并返回一个新的、更小的进程组,仅包含原始组中不在 ranks_to_exclude 列表中的 ranks。

参数:
  • ranks_to_exclude (List[int]) – 要从新组中排除的原始 group 的 ranks 列表。

  • group (ProcessGroup, optional) – 要缩小的进程组。如果为 None,则使用默认进程组。默认为 None

  • shrink_flags (int, optional) – 控制缩小行为的标志。可以是 SHRINK_DEFAULT (默认) 或 SHRINK_ABORTSHRINK_ABORT 将尝试在缩小之前终止父通信器中的正在进行的 operasi。默认为 SHRINK_DEFAULT

  • pg_options (ProcessGroupOptions, optional) – 应用于缩小后的进程组的后端特定选项。如果提供了,后端将在创建新组时使用这些选项。如果省略,新组将继承父组的默认设置。

返回:

由剩余 ranks 组成的同一新组。如果缩小了默认组,返回的组将成为新的默认组。

返回类型:

ProcessGroup

抛出:
  • TypeError – 如果组的后端不支持缩小。

  • ValueError – 如果 ranks_to_exclude 无效(为空、越界、

  • 重复或排除了所有 ranks)。

  • RuntimeError – 如果被排除的 rank 调用此函数或后端

  • 操作失败。

注意事项

  • 只有非排除的 ranks 应该调用此函数;被排除的 ranks 不能参与缩小操作。

  • 缩小默认组会销毁所有其他进程组,因为 rank 重排使其不一致。

torch.distributed.get_group_rank(group, global_rank)[source]#

将全局 rank 转换为组 rank。

global_rank 必须是 group 的一部分,否则将引发 RuntimeError。

参数:
  • group (ProcessGroup) – 用于查找相对 rank 的 ProcessGroup。

  • global_rank (int) – 要查询的全局 rank。

返回:

相对于 groupglobal_rank 的组 rank。

返回类型:

int

注意:在默认进程组上调用此函数会返回自身。

torch.distributed.get_global_rank(group, group_rank)[source]#

将组 rank 转换为全局 rank。

group_rank 必须是 group 的一部分,否则将引发 RuntimeError。

参数:
  • group (ProcessGroup) – 用于从中查找全局 rank 的 ProcessGroup。

  • group_rank (int) – 要查询的组 rank。

返回:

相对于 groupgroup_rank 的全局 rank。

返回类型:

int

注意:在默认进程组上调用此函数会返回自身。

torch.distributed.get_process_group_ranks(group)[source]#

获取与 group 相关联的所有 ranks。

参数:

group (Optional[ProcessGroup]) – 要从中获取所有 ranks 的 ProcessGroup。如果为 None,则使用默认进程组。

返回:

按组 rank 排序的全局 ranks 列表。

返回类型:

list[int]

DeviceMesh#

DeviceMesh 是一个管理进程组(或 NCCL communicators)的更高级别抽象。它允许用户轻松创建跨节点和节点内进程组,而无需担心如何为不同的子进程组正确设置 ranks,并且它有助于轻松管理那些分布式进程组。可以使用 init_device_mesh() 函数创建一个新的 DeviceMesh,其中包含描述设备拓扑的 mesh 形状。

class torch.distributed.device_mesh.DeviceMesh(device_type, mesh=None, *, mesh_dim_names=None, backend_override=None, _init_backend=True, _rank=None, _layout=None, _rank_map=None, _root_mesh=None)[source]#

DeviceMesh 表示设备的网格,其中设备的布局可以表示为 n 维数组,n 维数组的每个值是默认进程组 rank 的全局 ID。

DeviceMesh 可用于在集群中设置 N 维设备连接,并管理 N 维并行性的 ProcessGroups。通信可以在 DeviceMesh 的每个维度上分别进行。DeviceMesh 尊重用户已选择的设备(即,如果用户在 DeviceMesh 初始化之前调用 torch.cuda.set_device),并且如果用户尚未预先设置设备,则会为当前进程选择/设置设备。请注意,手动设备选择应在 DeviceMesh 初始化之前进行。

DeviceMesh 也可以与 DTensor API 一起使用作为上下文管理器。

注意

DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群的所有进程/ranks 上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有 ranks 之间是相同的。不一致的 mesh 将导致静默挂起。

参数:
  • device_type (str) – 网格的设备类型。当前支持:“cpu”,“cuda/cuda-like”。

  • mesh (ndarray) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。

  • _rank (int) – (实验性/内部) 当前进程的全局 rank。如果未提供,将从默认进程组推断。

返回:

一个 DeviceMesh 对象,表示设备布局。

返回类型:

DeviceMesh

以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。沿 mesh 的第一个维度的归约将跨列 (0, 4) 等和 (3, 7) 进行归约,沿第二个维度的归约将跨行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 进行归约。

示例

>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
property device_type: str#

返回网格的设备类型。

static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]#

使用 device_type 从现有 ProcessGroup 或一组现有的 ProcessGroup 构建 DeviceMesh

构造的设备网格的维度数量等于传入组的数量。例如,如果传入单个进程组,则生成的 DeviceMesh 是一个 1D 网格。如果传入 2 个进程组的列表,则生成的 DeviceMesh 是一个 2D 网格。

如果传入多个组,则必须提供 meshmesh_dim_names 参数。传入进程组的顺序决定了网格的拓扑。例如,第一个进程组将是 DeviceMesh 的第 0 个维度。传入的 mesh 张量必须具有与传入进程组数量相同的维度,并且 mesh 张量中的维度顺序必须与传入进程组中的顺序匹配。

参数:
  • group (ProcessGroup or list[ProcessGroup]) – 现有的 ProcessGroup 或一系列现有的 ProcessGroups。

  • device_type (str) – 网格的设备类型。当前支持:“cpu”,“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。

  • mesh (torch.Tensor or ArrayLike, optional) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。默认为 None。

  • mesh_dim_names (tuple[str, ...], optional) – 分配给描述设备布局的多维数组的每个维度的网格维度名称的元组。其长度必须与 mesh_shape 的长度匹配。 mesh_dim_names 中的每个字符串必须是唯一的。默认为 None。

返回:

一个 DeviceMesh 对象,表示设备布局。

返回类型:

DeviceMesh

get_all_groups()[source]#

返回所有网格维度的 ProcessGroups 列表。

返回:

ProcessGroup 对象的列表。

返回类型:

list[ProcessGroup]

get_coordinate()[source]#

返回此 rank 相对于网格所有维度的相对索引。如果此 rank 不属于网格,则返回 None。

返回类型:

list[int] | None

get_group(mesh_dim=None)[source]#

返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是 1 维的,则返回网格中唯一的 ProcessGroup。

参数:
  • mesh_dim (str/python:int, optional) – 它可以是网格维度的名称或索引

  • None. (网格维度的。默认为) –

返回:

ProcessGroup 对象。

返回类型:

ProcessGroup

get_local_rank(mesh_dim=None)[source]#

返回 DeviceMesh 指定的 mesh_dim 的本地 rank。

参数:
  • mesh_dim (str/python:int, optional) – 它可以是网格维度的名称或索引

  • None. (网格维度的。默认为) –

返回:

表示本地 rank 的整数。

返回类型:

int

以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在 rank 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 0。在 rank 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 1。在 rank 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 0。在 rank 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 1。在 rank 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 2。在 rank 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 3。

示例

>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
get_rank()[source]#

返回当前全局 rank。

返回类型:

int

property mesh: Tensor#

返回表示设备布局的张量。

property mesh_dim_names: tuple[str, ...] | None#

返回网格维度的名称。

点对点通信#

torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]#

同步发送张量。

警告

tag 参数不适用于 NCCL 后端。

参数:
  • tensor (Tensor) – 要发送的张量。

  • dst (int) – 全局进程组上的目标 rank (与 group 参数无关)。目标 rank 不能与当前进程的 rank 相同。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于将发送与远程接收匹配的标签。

  • group_dst (int, optional) – group 上的目标 rank。指定 dstgroup_dst 均无效。

torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source]#

同步接收张量。

警告

tag 参数不适用于 NCCL 后端。

参数:
  • tensor (Tensor) – 用于填充接收数据的张量。

  • src (int, optional) – 全局进程组中的源 rank(忽略 group 参数)。如果未指定,则从任何进程接收。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于将 recv 与远程 send 匹配的标签。

  • group_src (int, optional) – group 中的目标 rank。指定 srcgroup_src 是无效的。

返回:

发送 rank -1,如果不是组的一部分。

返回类型:

int

isend()irecv() 在使用时返回分布式请求对象。通常,此对象的类型未指定,因为它们绝不应手动创建,但它们保证支持两种方法

  • is_completed() - 返回 True,如果 operasi 已完成。

  • wait() - 将阻塞进程直到 operasi 完成。is_completed() 保证在返回后返回 True。

torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]#

异步发送张量。

警告

在请求完成之前修改 tensor 会导致未定义行为。

警告

tag 参数不适用于 NCCL 后端。

与阻塞的 send 不同,isend 允许 src == dst rank,即发送到自身。

参数:
  • tensor (Tensor) – 要发送的张量。

  • dst (int) – 全局进程组上的目标 rank (与 group 参数无关)。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于将发送与远程接收匹配的标签。

  • group_dst (int, optional) – group 上的目标 rank。指定 dstgroup_dst 均无效。

返回:

分布式请求对象。如果不是组的一部分,则为 None。

返回类型:

Work | None

torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]#

异步接收张量。

警告

tag 参数不适用于 NCCL 后端。

与阻塞的 recv 不同,irecv 允许 src == dst rank,即自接收。

参数:
  • tensor (Tensor) – 用于填充接收数据的张量。

  • src (int, optional) – 全局进程组中的源 rank(忽略 group 参数)。如果未指定,则从任何进程接收。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于将 recv 与远程 send 匹配的标签。

  • group_src (int, optional) – group 中的目标 rank。指定 srcgroup_src 是无效的。

返回:

分布式请求对象。如果不是组的一部分,则为 None。

返回类型:

Work | None

torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]#

同步发送 object_list 中的可序列化对象。

类似于 send(),但可以传递 Python 对象。请注意,object_list 中的所有对象都必须是可序列化的才能发送。

参数:
  • object_list (List[Any]) – 要发送的输入对象列表。每个对象必须是可序列化的。接收者必须提供大小相等的列表。

  • dst (int) – 将 object_list 发送到目标 rank。目标 rank 基于全局进程组(与 group 参数无关)。

  • group (ProcessGroup | None) – (ProcessGroup, optional): 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则对象将被序列化并转换为张量,然后发送前移至 device。默认为 None

  • group_dst (int, optional) – group 上的目标 rank。必须指定 dstgroup_dst 中的一个,但不能同时指定两者。

  • use_batch (bool, optional) – 如果为 True,则使用批处理 p2p 操作而不是常规的 send 操作。这可以避免初始化 2-rank 通信器,并使用现有的整个 group 通信器。有关用法和假设,请参阅 batch_isend_irecv。默认为 False

返回:

.

注意

对于基于 NCCL 的进程组,必须在通信发生之前将对象的内部张量表示移至 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,用户有责任确保它已设置,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

send_object_list() 隐式使用了 pickle 模块,众所周知,该模块不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅使用您信任的数据调用此函数。

警告

使用 GPU 张量调用 send_object_list() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量会被 pickling。请考虑使用 send()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]#

同步接收 object_list 中的可 pickling 对象。

recv() 类似,但可以接收 Python 对象。

参数:
  • object_list (List[Any]) – 要接收到其中的对象列表。必须提供大小与要发送的列表大小相等的列表。

  • src (int, optional) – 从哪个 rank 接收 object_list。源 rank 基于全局进程组(与 group 参数无关)。如果设置为 None,则从任何 rank 接收。默认为 None

  • group (ProcessGroup | None) – (ProcessGroup, optional): 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则在该设备上接收。默认为 None

  • group_src (int, optional) – group 中的目标 rank。指定 srcgroup_src 是无效的。

  • use_batch (bool, optional) – 如果为 True,则使用批处理 p2p 操作而不是常规的 send 操作。这可以避免初始化 2-rank 通信器,并使用现有的整个 group 通信器。有关用法和假设,请参阅 batch_isend_irecv。默认为 False

返回:

发送方 rank。如果 rank 不属于 group,则为 -1。如果 rank 属于 group,则 object_list 将包含来自 src rank 的发送对象。

注意

对于基于 NCCL 的进程组,必须在通信发生之前将对象的内部张量表示移至 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,用户有责任确保它已设置,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

recv_object_list() 隐式使用了 pickle 模块,众所周知,该模块不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅使用您信任的数据调用此函数。

警告

使用 GPU 张量调用 recv_object_list() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量会被 pickling。请考虑使用 recv()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.batch_isend_irecv(p2p_op_list)[source]#

异步发送或接收张量批次并返回请求列表。

处理 p2p_op_list 中的每个操作,并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。

参数:

p2p_op_list (list[P2POp]) – 点对点操作列表(每个操作符的类型是 torch.distributed.P2POp)。列表中 isend/irecv 的顺序很重要,它需要与远程端的相应 isend/irecv 匹配。

返回:

通过调用 op_list 中的相应操作返回的分布式请求对象列表。

返回类型:

list[Work]

示例

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size)
>>> recv_op = dist.P2POp(
...     dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size
... )
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则将导致意外的挂起问题。

此外,如果此 API 是传递给 dist.P2POpgroup 中的第一个集合调用,则 group 的所有 rank 必须参与此 API 调用;否则,行为是未定义的。如果此 API 调用不是 group 中的第一个集合调用,则允许仅涉及 group 的部分 rank 的批量 P2P 操作。

class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source]#

一个用于为 batch_isend_irecv 构建点对点操作的类。

此类构建 P2P 操作的类型、通信缓冲区、对等 rank、进程组和标签。此类实例将传递给 batch_isend_irecv 进行点对点通信。

参数:
  • op (Callable) – 用于向对等进程发送数据或从对等进程接收数据的函数。op 的类型为 torch.distributed.isendtorch.distributed.irecv

  • tensor (Tensor) – 要发送或接收的张量。

  • peer (int, optional) – 目标或源 rank。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • tag (int, optional) – 用于匹配发送与接收的标签。

  • group_peer (int, optional) – 目标或源 rank。

同步和异步集体操作#

每个集体操作函数都支持以下两种操作,具体取决于传递给集体的 async_op 标志的设置。

同步操作 - 默认模式,当 async_op 设置为 False 时。函数返回时,保证集体操作已执行。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何使用集体调用输出的后续函数调用都将按预期工作。对于 CUDA 集体操作,在同一 CUDA 流上使用输出的函数调用将按预期工作。用户在运行于不同流下的场景中必须小心同步。有关 CUDA 语义(如流同步)的详细信息,请参阅 CUDA 语义。请参阅下面的脚本,了解 CPU 和 CUDA 操作在这些语义上的差异示例。

异步操作 - 当 async_op 设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您不需要手动创建它,并且它保证支持两个方法:

  • is_completed() - 对于 CPU 集体操作,如果已完成,则返回 True。对于 CUDA 操作,如果操作已成功入队到 CUDA 流,并且输出可以在默认流上使用而无需进一步同步,则返回 True

  • wait() - 对于 CPU 集体操作,将阻塞进程直到操作完成。对于 CUDA 集体操作,将阻塞当前活动的 CUDA 流直到操作完成(但不会阻塞 CPU)。

  • get_future() - 返回 torch._C.Future 对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但不包括点对点操作。注意:随着我们继续采用 Futures 并合并 API,get_future() 调用可能会变得多余。

示例

以下代码可以作为参考,说明在使用分布式集体时 CUDA 操作的语义。它显示了在使用不同 CUDA 流上的集体输出时显式同步的必要性。

# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)

集体函数#

torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]#

将张量广播到整个组。

tensor 在参与集体的所有进程中必须具有相同数量的元素。

参数:
  • tensor (Tensor) – 如果 src 是当前进程的 rank,则为要发送的数据;否则为用于保存接收数据的张量。

  • src (int) – 全局进程组上的源 rank(与 group 参数无关)。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

  • group_src (int) – group 上的源 rank。必须指定 group_srcsrc 中的一个,但不能同时指定两者。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]#

object_list 中的可 pickling 对象广播到整个组。

broadcast() 类似,但可以传递 Python 对象。请注意,object_list 中的所有对象都必须是可 pickling 的才能进行广播。

参数:
  • object_list (List[Any]) – 要广播的输入对象列表。每个对象都必须是可 pickling 的。只有 src rank 上的对象才会被广播,但每个 rank 都必须提供大小相等的列表。

  • src (int) – 从哪个 rank 广播 object_list。源 rank 基于全局进程组(与 group 参数无关)。

  • group (ProcessGroup | None) – (ProcessGroup, optional): 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则对象将被序列化并转换为张量,然后广播前移至 device。默认为 None

  • group_src (int) – group 上的源 rank。不得同时指定 group_srcsrc 中的一个。

返回:

None。如果 rank 属于 group,则 object_list 将包含来自 src rank 的广播对象。

注意

对于基于 NCCL 的进程组,必须在通信发生之前将对象的内部张量表示移至 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,用户有责任确保它已设置,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

注意

请注意,此 API 与 broadcast() 集体操作略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

broadcast_object_list() 隐式使用了 pickle 模块,众所周知,该模块不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅使用您信任的数据调用此函数。

警告

使用 GPU 张量调用 broadcast_object_list() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量会被 pickling。请考虑使用 broadcast()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     objects = [None, None, None]
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#

在所有机器上规约张量数据,以便所有机器都获得最终结果。

调用后,tensor 在所有进程中都将是按位相同的。

支持复杂张量。

参数:
  • tensor (Tensor) – 集体操作的输入和输出。该函数就地操作。

  • op (optional) – torch.distributed.ReduceOp 枚举中的一个值。指定用于逐元素规约的操作。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

示例

>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor(
...     [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]#

规约所有机器上的张量数据。

只有 rank 为 dst 的进程才会收到最终结果。

参数:
  • tensor (Tensor) – 集体操作的输入和输出。该函数就地操作。

  • dst (int) – 全局进程组上的目标 rank (与 group 参数无关)。

  • op (optional) – torch.distributed.ReduceOp 枚举中的一个值。指定用于逐元素规约的操作。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

  • group_dst (int) – group 上的目标 rank。必须指定 group_dstdst 中的一个,但不能同时指定两者。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]#

将整个组中的张量收集到一个列表中。

支持复杂和不等大的张量。

参数:
  • tensor_list (list[Tensor]) – 输出列表。它应该包含正确大小的张量,用于集体操作的输出。支持不等大的张量。

  • tensor (Tensor) – 要从当前进程广播的张量。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

示例

>>> # All tensors below are of torch.int64 dtype.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_list = [
...     torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> # We have 2 process groups, 2 ranks.
>>> tensor_list = [
...     torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor(
...     [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]#

从所有 rank 收集张量并将它们放入单个输出张量中。

此函数要求所有进程中的张量大小相同。

参数:
  • output_tensor (Tensor) – 用于容纳所有 rank 的张量元素的输出张量。它必须具有正确的大小,形状为以下两种之一:(i) 沿主维度连接所有输入张量;“连接”的定义参见 torch.cat();(ii) 沿主维度堆叠所有输入张量;“堆叠”的定义参见 torch.stack()。下面的示例可以更好地解释支持的输出形式。

  • input_tensor (Tensor) – 要从当前 rank 收集的张量。与 all_gather API 不同,此 API 中的输入张量在所有 rank 之间的大小必须相同。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # Output in concatenation form
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # Output in stack form
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
        [3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
        [3, 4]], device='cuda:1') # Rank 1
torch.distributed.all_gather_object(object_list, obj, group=None)[source]#

将整个组中的可 pickling 对象收集到一个列表中。

all_gather() 类似,但可以传递 Python 对象。请注意,对象必须是可 pickling 的才能进行收集。

参数:
  • object_list (list[Any]) – 输出列表。它的大小应等于该集体操作的 group 大小,并将包含输出。

  • obj (Any) – 要从当前进程广播的可 pickling Python 对象。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

返回:

None。如果调用 rank 属于此 group,则集体操作的输出将填充到输入的 object_list 中。如果调用 rank 不属于此 group,则传入的 object_list 将保持不变。

注意

请注意,此 API 与 all_gather() 集体操作略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。

注意

对于基于 NCCL 的进程组,必须在通信发生之前将对象的内部张量表示移至 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,用户有责任确保它已设置,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

all_gather_object() 隐式使用了 pickle 模块,众所周知,该模块不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅使用您信任的数据调用此函数。

警告

使用 GPU 张量调用 all_gather_object() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量会被 pickling。请考虑使用 all_gather()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source]#

将张量列表收集到一个进程中。

此函数要求所有进程中的张量大小相同。

参数:
  • tensor (Tensor) – 输入张量。

  • gather_list (list[Tensor], optional) – 用于收集数据的、尺寸适当的、大小相同的张量列表(默认为 None,必须在目标 rank 上指定)。

  • dst (int, optional) – 全局进程组上的目标 rank(与 group 参数无关)。(如果 dstgroup_dst 都为 None,则默认为全局 rank 0)。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

  • group_dst (int, optional) – group 上的目标 rank。指定 dstgroup_dst 均无效。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

注意

请注意,gather_list 中的所有张量都必须具有相同的大小。

示例:
>>> # We have 2 process groups, 2 ranks.
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> if dist.get_rank() == 0:
>>>     gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>>     gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 gets gathered data.
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None                                                                   # Rank 1
torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source]#

将整个组的可序列化对象收集到一个进程中。

gather() 类似,但可以传递 Python 对象。请注意,对象必须是可序列化的才能被收集。

参数:
  • obj (Any) – 输入对象。必须是可序列化的。

  • object_gather_list (list[Any]) – 输出列表。在 dst rank 上,它应该正确地填充为该集合通信的大小,并将包含输出。在非-dst rank 上必须为 None。(默认为 None)。

  • dst (int, optional) – 全局进程组上的目标 rank(与 group 参数无关)。(如果 dstgroup_dst 都为 None,则默认为全局 rank 0)。

  • group (ProcessGroup | None) – (ProcessGroup, optional): 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

  • group_dst (int, optional) – group 上的目标 rank。指定 dstgroup_dst 均无效。

返回:

None。在 dst rank 上,object_gather_list 将包含集合通信的输出。

注意

请注意,此 API 与 gather 集合通信略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。

注意

对于基于 NCCL 的进程组,必须在通信发生之前将对象的内部张量表示移至 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,用户有责任确保它已设置,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

gather_object() 隐式使用 pickle 模块,该模块已知是不安全的。可以构造恶意 pickle 数据,该数据在反序列化时会执行任意代码。仅将此函数与您信任的数据一起调用。

警告

使用 GPU 张量调用 gather_object() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被序列化。请考虑使用 gather() 代替。

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
...     gather_objects[dist.get_rank()],
...     output if dist.get_rank() == 0 else None,
...     dst=0
... )
>>> # On rank 0
>>> output
['foo', 12, {1: 2}]
torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source]#

将张量列表散布到组中的所有进程。

每个进程将接收一个张量,并将其数据存储在 tensor 参数中。

支持复杂张量。

参数:
  • tensor (Tensor) – 输出张量。

  • scatter_list (list[Tensor]) – 要散布的张量列表(默认为 None,必须在源 rank 上指定)。

  • src (int) – 全局进程组上的源 rank(与 group 参数无关)。(如果 srcgroup_src 都为 None,则默认为全局 rank 0)。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

  • group_src (int, optional) – group 上的源 rank。指定 srcgroup_src 是无效的。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

注意

请注意,scatter_list 中的所有张量都必须具有相同的大小。

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     # Only tensors, all of which must be the same size.
>>>     t_ones = torch.ones(tensor_size, device=device)
>>>     t_fives = torch.ones(tensor_size, device=device) * 5
>>>     scatter_list = [t_ones, t_fives]
>>> else:
>>>     scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i gets scatter_list[i].
>>> output_tensor
tensor([1., 1.], device='cuda:0') # Rank 0
tensor([5., 5.], device='cuda:1') # Rank 1
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[source]#

scatter_object_input_list 中的可序列化对象散布到整个组。

scatter() 类似,但可以传递 Python 对象。在每个 rank 上,散布的对象将存储在 scatter_object_output_list 的第一个元素中。请注意,scatter_object_input_list 中的所有对象都必须是可序列化的才能被散布。

参数:
  • scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储散布到此 rank 的对象。

  • scatter_object_input_list (List[Any], optional) – 要散布的输入对象列表。每个对象都必须是可序列化的。只有 src rank 上的对象才会被散布,并且对于非-src rank,该参数可以为 None

  • src (int) – 要从中散布 scatter_object_input_list 的源 rank。源 rank 基于全局进程组(与 group 参数无关)。(如果 srcgroup_src 都为 None,则默认为全局 rank 0)。

  • group (ProcessGroup | None) – (ProcessGroup, optional): 要操作的进程组。如果为 None,则使用默认进程组。默认为 None

  • group_src (int, optional) – group 上的源 rank。指定 srcgroup_src 是无效的。

返回:

None。如果 rank 是组的一部分,scatter_object_output_list 的第一个元素将被设置为该 rank 的散布对象。

注意

请注意,此 API 与 scatter 集合通信略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。

警告

对象集合在性能和可扩展性方面存在一些严重限制。有关详细信息,请参阅 对象集合

警告

scatter_object_list() 隐式使用 pickle 模块,该模块已知是不安全的。可以构造恶意 pickle 数据,该数据在反序列化时会执行任意代码。仅将此函数与您信任的数据一起调用。

警告

使用 GPU 张量调用 scatter_object_list() 的支持不佳且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被序列化。请考虑使用 scatter() 代替。

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     # Can be any list on non-src ranks, elements are not used.
>>>     objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # Rank i gets objects[i]. For example, on rank 2:
>>> output_list
[{1: 2}]
torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#

减少,然后将张量列表散布到组中的所有进程。

参数:
  • output (Tensor) – 输出张量。

  • input_list (list[Tensor]) – 要进行归约和散布的张量列表。

  • op (optional) – torch.distributed.ReduceOp 枚举中的一个值。指定用于逐元素规约的操作。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回:

如果 async_op 设置为 True,则为异步操作句柄。如果不是异步操作或不属于该组,则为 None。

torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#

归约,然后将张量散布到组中的所有 rank。

参数:
  • output (Tensor) – 输出张量。它应该在所有 rank 上具有相同的大小。

  • input (Tensor) – 要归约和散布的输入张量。其大小应为输出张量大小乘以世界大小。输入张量可以具有以下形状之一:(i)沿主维度连接的输出张量,或(ii)沿主维度堆叠的输出张量。有关“连接”的定义,请参阅 torch.cat()。有关“堆叠”的定义,请参阅 torch.stack()

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回:

如果 async_op 设置为 True,则为异步操作句柄。如果不是异步操作或不属于该组,则为 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # Input in concatenation form
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # Rank 0
tensor([0, 1, 2, 3], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # Input in stack form
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
        [2, 3]], device='cuda:0') # Rank 0
tensor([[0, 1],
        [2, 3]], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]#

拆分输入张量,然后将拆分后的列表散布到组中的所有进程。

之后,从组中的所有进程接收的张量将被连接成一个单一的输出张量返回。

支持复杂张量。

参数:
  • output (Tensor) – 收集到的连接的输出张量。

  • input (Tensor) – 要散布的输入张量。

  • output_split_sizes – (list[Int], optional): dim 0 的输出拆分大小。如果指定为 None 或空,则 output 张量的 dim 0 必须能被 world_size 整除。

  • input_split_sizes – (list[Int], optional): dim 0 的输入拆分大小。如果指定为 None 或空,则 input 张量的 dim 0 必须能被 world_size 整除。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回:

如果 async_op 设置为 True,则为异步操作句柄。如果不是异步操作或不属于该组,则为 None。

警告

all_to_all_single 是实验性的,可能会发生更改。

示例

>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3])     # Rank 0
tensor([4, 5, 6, 7])     # Rank 1
tensor([8, 9, 10, 11])   # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12])    # Rank 0
tensor([1, 5, 9, 13])    # Rank 1
tensor([2, 6, 10, 14])   # Rank 2
tensor([3, 7, 11, 15])   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list = list(output.chunk(world_size))
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0,  1, 10, 11, 12, 20, 21, 30, 31])                     # Rank 0
tensor([ 2,  3, 13, 14, 22, 32, 33])                             # Rank 1
tensor([ 4, 15, 16, 23, 34, 35])                                 # Rank 2
tensor([ 5, 17, 18, 24, 36])                                     # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor(
...     [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat
... ) + 4 * rank * (1 + 1j)
>>> input
tensor([1+1j, 2+2j, 3+3j, 4+4j])                                # Rank 0
tensor([5+5j, 6+6j, 7+7j, 8+8j])                                # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j])                          # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j])                        # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j])                              # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j])                            # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j])                            # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j])                            # Rank 3
torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]#

将输入张量列表散布到组中的所有进程,并将收集到的张量列表返回到输出列表中。

支持复杂张量。

参数:
  • output_tensor_list (list[Tensor]) – 要收集的张量列表,每个 rank 一个。

  • input_tensor_list (list[Tensor]) – 要散布的张量列表,每个 rank 一个。

  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回:

如果 async_op 设置为 True,则为异步操作句柄。如果不是异步操作或不属于该组,则为 None。

警告

all_to_all 是实验性的,可能会发生更改。

示例

>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])]     # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])]     # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])]   # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])]    # Rank 0
[tensor([1]), tensor([5]), tensor([9]), tensor([13])]    # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])]   # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])]   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = input
>>> gather_list = output
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> input = list(input.split(input_splits))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])]                   # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])]                 # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])]         # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])]   # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])]           # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])]              # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])]                  # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor(
...     [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat
... ) + 4 * rank * (1 + 1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])]            # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])]            # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])]      # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])]    # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])]          # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])]        # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])]        # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])]        # Rank 3
torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]#

同步所有进程。

此集合通信会阻塞进程,直到整个组进入此函数,如果 async_op 为 False,或者如果异步操作句柄被调用 wait()。

参数:
  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作

  • device_ids ([int], optional) – 设备/GPU ID 列表。预期只有一个 ID。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于 group,则为 None。

注意

ProcessGroupNCCL 现在会阻塞 CPU 线程直到 barrier 集合通信完成。

注意

ProcessGroupNCCL 将 barrier 实现为一个 1 元素的张量的 all_reduce。必须选择一个设备来分配此张量。设备选择的顺序为:(1)barrier 的 device_ids 参数(如果不是 None)中的第一个设备,(2)init_process_group 使用的设备(如果不是 None),(3)首次与此进程组使用的设备(如果已执行另一个带有张量输入的集合通信),(4)全局 rank 对局部设备计数取模指示的设备索引。

torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]#

同步进程,类似于 torch.distributed.barrier,但考虑了可配置的超时。

它能够报告在指定超时时间内未通过此 barrier 的 rank。具体来说,对于非零 rank,将阻塞直到从 rank 0 处理完 send/recv。Rank 0 将阻塞直到处理完所有来自其他 rank 的 send/recv,并将报告未能及时响应的 rank 的失败。请注意,如果一个 rank 未达到 monitored_barrier(例如由于挂起),所有其他 rank 将在 monitored_barrier 中失败。

此集合通信将阻塞组中的所有进程/rank,直到整个组成功退出函数,这使其对于调试和同步非常有用。但是,它可能会影响性能,应仅用于调试或需要主机端完全同步点的场景。对于调试目的,可以在应用程序的集合通信调用之前插入此 barrier,以检查是否有 rank 不同步。

注意

请注意,此集合通信仅支持 GLOO 后端。

参数:
  • group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。

  • timeout (datetime.timedelta, optional) – monitored_barrier 的超时时间。如果为 None,则使用默认进程组超时。

  • wait_all_ranks (bool, optional) – 是否收集所有失败的 rank。默认情况下,此值为 False,并且 rank 0 上的 monitored_barrier 将在遇到的第一个失败 rank 上抛出,以便快速失败。通过设置 wait_all_ranks=Truemonitored_barrier 将收集所有失败的 rank 并抛出包含所有失败 rank 信息的错误。

返回:

.

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>>     dist.monitored_barrier() # Raises exception indicating that
>>> # rank 1 did not call into monitored_barrier.
>>> # Example with wait_all_ranks=True
>>> if dist.get_rank() == 0:
>>>     dist.monitored_barrier(wait_all_ranks=True) # Raises exception
>>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
>>> # monitored_barrier.
class torch.distributed.Work#

一个 Work 对象表示 PyTorch 分布式包中待处理的异步操作的句柄。它由非阻塞集合通信操作返回,例如 dist.all_reduce(tensor, async_op=True)

block_current_stream(self: torch._C._distributed_c10d.Work) None#

阻塞当前活动的 GPU 流以完成操作。对于基于 GPU 的集合通信,这等同于同步。对于 CPU 发起的集合通信(例如使用 Gloo),这将在操作完成之前阻塞 CUDA 流。

这在所有情况下都会立即返回。

要检查操作是否成功,您应该异步检查 Work 对象的结果。

boxed(self: torch._C._distributed_c10d.Work) object#
exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr#
get_future(self: torch._C._distributed_c10d.Work) torch.Future#
返回:

一个与 Work 完成相关的 torch.futures.Future 对象。例如,可以通过 fut = process_group.allreduce(tensors).get_future() 检索 Future 对象。

示例:

下面是一个简单的 allreduce DDP 通信钩子示例,它使用 get_future API 来检索与 allreduce 完成相关的 Future。

>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future
>>>     group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD
>>>     tensor = bucket.buffer().div_(group_to_use.size())
>>>     return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future()
>>> ddp_model.register_comm_hook(state=None, hook=allreduce)

警告

get_future API 支持 NCCL,部分支持 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回一个 torch.futures.Future

在上面的示例中,allreduce 操作将在 GPU 上使用 NCCL 后端完成,fut.wait() 将在同步适当的 NCCL 流与 PyTorch 的当前设备流后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待 GPU 上的整个操作完成。请注意,CUDAFuture 不支持 TORCH_NCCL_BLOCKING_WAIT 标志或 NCCL 的 barrier()。此外,如果 fut.then() 添加了一个回调函数,它将等待直到 WorkNCCL 的 NCCL 流与 ProcessGroupNCCL 的专用回调流同步,并在回调流上执行回调后内联调用该回调。 fut.then() 将返回另一个 CUDAFuture,该 Future 包含回调的返回值,以及一个记录回调流的 CUDAEvent

  1. 对于 CPU 操作,fut.done() 在操作完成且 value() 张量准备就绪时返回 True。

  2. 对于 GPU 操作,fut.done() 仅在操作已入队时返回 True。

  3. 对于混合 CPU-GPU 操作(例如使用 GLOO 发送 GPU 张量),fut.done() 在张量到达相应节点时返回 True,但可能尚未在相应 GPU 上同步(与 GPU 操作类似)。

get_future_result(self: torch._C._distributed_c10d.Work) torch.Future#
返回:

一个类型为 int 的 torch.futures.Future 对象,它映射到 WorkResult 的枚举类型。例如,可以通过 fut = process_group.allreduce(tensor).get_future_result() 来获取一个 future 对象。

示例:

用户可以使用 fut.wait() 来阻塞等待工作完成,并通过 fut.value() 获取 WorkResult。此外,用户还可以使用 fut.then(call_back_func) 注册一个回调函数,在工作完成后调用,而不会阻塞当前线程。

警告

get_future_result API 支持 NCCL

is_completed(self: torch._C._distributed_c10d.Work) bool#
is_success(self: torch._C._distributed_c10d.Work) bool#
result(self: torch._C._distributed_c10d.Work) list[torch.Tensor]#
source_rank(self: torch._C._distributed_c10d.Work) int#
synchronize(self: torch._C._distributed_c10d.Work) None#
static unbox(arg0: object) torch._C._distributed_c10d.Work#
wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool#
返回:

真/假。

示例:
尝试

work.wait(timeout)

except

# 处理

警告

正常情况下,用户不需要设置 timeout。调用 wait() 等同于调用 synchronize():让当前流阻塞直到 NCCL 工作完成。但是,如果设置了 timeout,它将阻塞 CPU 线程直到 NCCL 工作完成或超时。如果超时,将抛出异常。

class torch.distributed.ReduceOp#

可用的归约操作的类,类似于枚举:SUMPRODUCTMINMAXBANDBORBXORPREMUL_SUM

在使用 NCCL 后端时,BANDBORBXOR 归约不可用。

AVG 在跨进程求和之前将值除以 world size。AVG 仅在 NCCL 后端可用,并且仅适用于 NCCL 版本 2.10 或更高版本。

PREMUL_SUM 在归约之前将输入在本地乘以给定的标量。PREMUL_SUM 仅在 NCCL 后端可用,并且仅适用于 NCCL 版本 2.11 或更高版本。用户应使用 torch.distributed._make_nccl_premul_sum

此外,MAXMINPRODUCT 不支持复数张量。

此类的值可以通过属性访问,例如 ReduceOp.SUM。它们用于指定归约集合通信策略,例如 reduce()

此类不支持 __members__ 属性。

class torch.distributed.reduce_op#

已弃用的归约操作的类,类似于枚举:SUMPRODUCTMINMAX

建议使用 ReduceOp 代替。

分布式键值存储#

distributed 包自带了一个分布式键值存储,可以用于在组内进程之间共享信息,以及在 torch.distributed.init_process_group() 中初始化 distributed 包(通过显式创建 store 作为指定 init_method 的替代方法)。键值存储有 3 种选择:TCPStoreFileStoreHashStore

class torch.distributed.Store#

所有 store 实现的基类,例如 PyTorch distributed 提供的 3 种:(TCPStoreFileStoreHashStore)。

__init__(self: torch._C._distributed_c10d.Store) None#
add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: SupportsInt) int#

对给定 key 的第一次 add 调用会在 store 中创建一个与 key 关联的计数器,并初始化为 amount。使用相同 key 进行后续的 add 调用会将计数器增加指定的 amount。使用已被 set() 方法在 store 中设置的 key 调用 add() 会导致异常。

参数:
  • key (str) – store 中将要递增计数器的键。

  • amount (int) – 计数器将要递增的数量。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#

根据提供的 keyvalue 将键值对追加到 store 中。如果 key 不存在于 store 中,它将被创建。

参数:
  • key (str) – 要追加到 store 中的键。

  • value (str) – 要添加到 store 中的与 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.append("first_key", "po")
>>> store.append("first_key", "tato")
>>> # Should return "potato"
>>> store.get("first_key")
check(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) bool#

检查给定 keys 列表中的键是否在 store 中具有值。此调用在正常情况下会立即返回,但在某些极端死锁情况下仍会失败,例如,在 TCPStore 已销毁后调用 check。调用 check() 并传入一个键列表,用于检查这些键是否存储在 store 中。

参数:

keys (list[str]) – 要查询是否存储在 store 中的键。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> # Should return 7
>>> store.check(["first_key"])
clone(self: torch._C._distributed_c10d.Store) torch._C._distributed_c10d.Store#

克隆 store 并返回一个指向相同底层 store 的新对象。返回的 store 可以与原始对象并发使用。这旨在提供一种安全的方式,通过为每个线程克隆一个 store 来从多个线程使用 store。

compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes#

根据提供的 keyexpected_valuedesired_value 执行比较,然后插入键值对。只有当 keyexpected_value 已存在于 store 中,或者 expected_value 为空字符串时,desired_value 才会被设置。

参数:
  • key (str) – 要在 store 中检查的键。

  • expected_value (str) – 在插入前要检查的与 key 关联的值。

  • desired_value (str) – 要添加到 store 中的与 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("key", "first_value")
>>> store.compare_set("key", "first_value", "second_value")
>>> # Should return "second_value"
>>> store.get("key")
delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool#

从 store 中删除与 key 关联的键值对。如果键成功删除,则返回 true,如果未删除,则返回 false

警告

delete_key API 仅被 TCPStoreHashStore 支持。使用此 API 与 FileStore 会导致异常。

参数:

key (str) – 要从 store 中删除的键。

返回:

如果 key 已被删除,则为 True,否则为 False

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, HashStore can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key")
>>> # This should return true
>>> store.delete_key("first_key")
>>> # This should return false
>>> store.delete_key("bad_key")
get(self: torch._C._distributed_c10d.Store, arg0: str) bytes#

检索 store 中与给定 key 关联的值。如果 key 不存在于 store 中,该函数将在抛出异常之前等待 timeout(在初始化 store 时定义)。

参数:

key (str) – 函数将返回与此键关联的值。

返回:

如果 key 存在于 store 中,则为与 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
has_extended_api(self: torch._C._distributed_c10d.Store) bool#

返回 store 是否支持扩展操作。

list_keys(self: torch._C._distributed_c10d.Store) list[str]#

返回 store 中的所有键的列表。

multi_get(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) list[bytes]#

检索 keys 中的所有值。如果 keys 中的任何键不存在于 store 中,函数将等待 timeout

参数:

keys (List[str]) – 要从 store 中检索的键。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "po")
>>> store.set("second_key", "tato")
>>> # Should return [b"po", b"tato"]
>>> store.multi_get(["first_key", "second_key"])
multi_set(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: collections.abc.Sequence[str]) None#

根据提供的 keysvalues 将一系列键值对插入 store。

参数:
  • keys (List[str]) – 要插入的键。

  • values (List[str]) – 要插入的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.multi_set(["first_key", "second_key"], ["po", "tato"])
>>> # Should return b"po"
>>> store.get("first_key")
num_keys(self: torch._C._distributed_c10d.Store) int#

返回 store 中设置的键的数量。请注意,这个数字通常会比通过 set()add() 添加的键的数量多一个,因为有一个键用于协调使用 store 的所有 worker。

警告

TCPStore 一起使用时,num_keys 返回写入底层文件的键的数量。如果 store 被销毁,并且使用相同文件创建了另一个 store,则原始键将被保留。

返回:

store 中存在的键的数量。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # This should return 2
>>> store.num_keys()
queue_len(self: torch._C._distributed_c10d.Store, arg0: str) int#

返回指定队列的长度。

如果队列不存在,则返回 0。

有关详细信息,请参阅 queue_push。

参数:

key (str) – 要获取长度的队列的键。

queue_pop(self: torch._C._distributed_c10d.Store, key: str, block: bool = True) bytes#

从指定队列中弹出一个值,如果队列为空,则等待直到超时。

有关详细信息,请参阅 queue_push。

如果 block 为 False,则队列为空时会引发 dist.QueueEmptyError。

参数:
  • key (str) – 要从中弹出的队列的键。

  • block (bool) – 是否阻塞等待键,或立即返回。

queue_push(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#

将一个值推送到指定的队列。

对队列和 set/get 操作使用相同的键可能会导致意外行为。

wait/check 操作支持队列。

使用队列的 wait 将只唤醒一个等待的工作进程,而不是全部。

参数:
  • key (str) – 要推送到队列的键。

  • value (str) – 要推送到队列的值。

set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#

根据提供的 keyvalue 将键值对插入存储。如果 key 已经存在于存储中,它将用新的 value 覆盖旧值。

参数:
  • key (str) – 要添加到存储中的键。

  • value (str) – 要添加到 store 中的与 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None#

设置存储的默认超时时间。此超时时间用于初始化以及 wait()get()

参数:

timeout (timedelta) – 要在存储中设置的超时时间。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set_timeout(timedelta(seconds=10))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"])
property timeout#

获取存储的超时时间。

wait(*args, **kwargs)#

重载函数。

  1. wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) -> None

等待 keys 中的每个键被添加到存储中。如果在 timeout (在存储初始化时设置) 之前所有键都未设置,则 wait 将引发异常。

参数:

keys (list) – 要在存储中等待设置的键列表。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 30 seconds
>>> store.wait(["bad_key"])
  1. wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: datetime.timedelta) -> None

等待 keys 中的每个键被添加到存储中,并在指定的 timeout 时间内未设置键时引发异常。

参数:
  • keys (list) – 要在存储中等待设置的键列表。

  • timeout (timedelta) – 等待键被添加之前抛出异常的时间。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"], timedelta(seconds=10))
class torch.distributed.TCPStore#

基于 TCP 的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过 TCP 连接到服务器存储并执行诸如 set() 插入键值对、get() 检索键值对等操作。应该始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。

参数:
  • host_name (str) – 服务器存储应运行的主机名或 IP 地址。

  • port (int) – 服务器存储应监听传入请求的端口。

  • world_size (int, optional) – 存储用户的总数(客户端数量 + 1 个服务器)。默认为 None(None 表示存储用户数量不固定)。

  • is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储时为 False。默认为 False。

  • timeout (timedelta, optional) – 存储在初始化期间以及 get()wait() 等方法中使用的超时时间。默认为 timedelta(seconds=300)

  • wait_for_workers (bool, optional) – 是否等待所有工作进程连接到服务器存储。这仅在 world_size 是固定值时适用。默认为 True。

  • multi_tenant (bool, optional) – 如果为 True,则当前进程中具有相同主机/端口的所有 TCPStore 实例将使用相同的底层 TCPServer。默认为 False。

  • master_listen_fd (int, optional) – 如果指定,底层 TCPServer 将在此文件描述符上监听,该文件描述符必须是已绑定到 port 的套接字。要绑定一个临时端口,我们建议将 port 设置为 0 并读取 .port。默认为 None(表示服务器创建一个新的套接字并尝试将其绑定到 port)。

  • use_libuv (bool, optional) – 如果为 True,则为 TCPServer 后端使用 libuv。默认为 True。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Run on process 1 (server)
>>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
>>> # Run on process 2 (client)
>>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
>>> # Use any of the store methods from either the client or server after initialization
>>> server_store.set("first_key", "first_value")
>>> client_store.get("first_key")
__init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: SupportsInt, world_size: SupportsInt | None = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: SupportsInt | None = None, use_libuv: bool = True) None#

创建一个新的 TCPStore。

property host#

获取存储监听请求的主机名。

property libuvBackend#

如果正在使用 libuv 后端,则返回 True。

property port#

获取存储监听请求的端口号。

class torch.distributed.HashStore#

基于底层哈希表的线程安全存储实现。此存储可以在同一进程中使用(例如,通过其他线程),但不能在进程之间使用。

示例:
>>> import torch.distributed as dist
>>> store = dist.HashStore()
>>> # store can be used from other threads
>>> # Use any of the store methods after initialization
>>> store.set("first_key", "first_value")
__init__(self: torch._C._distributed_c10d.HashStore) None#

创建一个新的 HashStore。

class torch.distributed.FileStore#

使用文件存储底层键值对的存储实现。

参数:
  • file_name (str) – 用于存储键值对的文件的路径

  • world_size (int, optional) – 使用存储的进程总数。默认为 -1(负值表示存储用户数量不固定)。

示例:
>>> import torch.distributed as dist
>>> store1 = dist.FileStore("/tmp/filestore", 2)
>>> store2 = dist.FileStore("/tmp/filestore", 2)
>>> # Use any of the store methods from either the client or server after initialization
>>> store1.set("first_key", "first_value")
>>> store2.get("first_key")
__init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: SupportsInt = -1) None#

创建一个新的 FileStore。

property path#

获取 FileStore 用于存储键值对的文件的路径。

class torch.distributed.PrefixStore#

包装任何一个键值存储(TCPStoreFileStoreHashStore),在每个插入存储的键前加上前缀。

参数:
  • prefix (str) – 在插入存储之前添加到每个键前面的前缀字符串。

  • store (torch.distributed.store) – 构成底层键值存储的存储对象。

__init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None#

创建一个新的 PrefixStore。

property underlying_store#

获取 PrefixStore 包装的底层存储对象。

分析通信聚合#

请注意,您可以使用 torch.profiler(推荐,仅在 1.8.1 后可用)或 torch.autograd.profiler 来分析此处提到的通信聚合和点对点通信 API。所有开箱即用的后端(glooncclmpi)都受支持,并且通信聚合的使用将在分析输出/跟踪中按预期呈现。分析代码与任何常规 torch 操作一样。

import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)

有关分析器的完整功能概述,请参阅 分析器文档

多 GPU 通信函数#

警告

多 GPU 函数(代表每个 CPU 线程的多个 GPU)已弃用。截至目前,PyTorch Distributed 首选的编程模型是每个线程一个设备,如本文档中的 API 所示。如果您是后端开发人员,并希望支持每线程多个设备,请联系 PyTorch Distributed 的维护者。

对象通信#

警告

对象通信有许多严重的限制。请仔细阅读以确定它们是否适用于您的用例。

对象通信是一组类似通信的操作,它们作用于任意 Python 对象,只要它们可以被 pickling。实现了各种通信模式(例如,broadcast、all_gather 等),但它们大致遵循以下模式

  1. 将输入对象转换为 pickle(原始字节),然后将其放入字节张量中

  2. 将此字节张量的大小通信给对等方(第一次通信操作)

  3. 分配大小适当的张量以执行实际通信

  4. 通信对象数据(第二次通信操作)

  5. 将原始数据转换回 Python(unpickle)

对象通信有时具有令人惊讶的性能或内存特性,这会导致运行时间长或内存不足,因此应谨慎使用。以下是一些常见问题。

不匹配的 pickle/unpickle 时间 - Pickling 对象可能很慢,具体取决于对象的数量、类型和大小。当通信具有扇入(例如,gather_object)时,接收方 rank 必须 unpickle 的对象数量是发送方 rank 需要 pickle 的对象的 N 倍,这可能导致其他 rank 在其下一个通信操作上超时。

低效的张量通信 - 应通过常规通信 API 发送张量,而不是对象通信 API。可以通过对象通信 API 发送张量,但它们将被序列化和反序列化(对于非 CPU 张量,还包括 CPU 同步和设备到主机的复制),在几乎所有情况下(调试或故障排除代码除外),都值得努力重构代码以使用非对象通信。

意外的张量设备 - 如果您仍然想通过对象通信发送张量,那么对于 CUDA(以及可能的其他加速器)张量还有一个特定的方面。如果您 pickling 一个当前位于 cuda:3 的张量,然后 unpickle 它,您将获得另一个位于 cuda:3 的张量,*无论您在哪个进程中,或者哪个 CUDA 设备是该进程的“默认”设备*。对于常规的张量通信 API,“输出张量”始终位于同一本地设备上,这通常是您期望的。

Unpickling 张量将隐式激活 CUDA 上下文,如果这是进程首次使用 GPU,则可能会浪费大量 GPU 内存。通过在将张量传递给对象通信的输入之前将它们移到 CPU,可以避免此问题。

第三方后端#

除了内置的 GLOO/MPI/NCCL 后端之外,PyTorch Distributed 还通过运行时注册机制支持第三方后端。关于如何通过 C++ 扩展开发第三方后端的参考,请参阅 教程 - 自定义 C++ 和 CUDA 扩展test/cpp_extensions/cpp_c10d_extension.cpp。第三方后端的性能取决于其自身的实现。

新的后端继承自 c10d::ProcessGroup,并在导入时通过 torch.distributed.Backend.register_backend() 注册后端名称和实例化接口。

当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group() 时,torch.distributed 包将在新后端上运行。

警告

第三方后端支持是实验性的,可能会发生变化。

启动实用程序#

torch.distributed 包还在 torch.distributed.launch 中提供了一个启动实用程序。此辅助实用程序可用于为分布式训练启动每个节点上的多个进程。

模块 torch.distributed.launch

torch.distributed.launch 是一个模块,用于在每个训练节点上启动多个分布式训练进程。

警告

该模块将弃用,转而使用 torchrun

该实用程序可用于单节点分布式训练,其中将启动每个节点上的一个或多个进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练,每个分布式进程将操作于单个 GPU。这可以实现性能显著提升的单节点训练。它还可以用于多节点分布式训练,通过在每个节点上启动多个进程来显著提升多节点分布式训练性能。这对于支持直接 GPU 的多个 Infiniband 接口的系统尤其有用,因为所有这些接口都可以用于聚合通信带宽。

在单节点分布式训练或多节点分布式训练的这两种情况下,该实用程序将启动每个节点上的指定数量的进程(--nproc-per-node)。如果用于 GPU 训练,此数量必须小于或等于当前系统的 GPU 数量(nproc_per_node),并且每个进程将操作于从 *GPU 0 到 GPU (nproc_per_node - 1)* 的单个 GPU。

如何使用此模块

  1. 单节点多进程分布式训练

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
  1. 多节点多进程分布式训练:(例如,两个节点)

节点 1: *(IP: 192.168.1.1,并且有一个空闲端口:1234)*

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

节点 2

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  1. 要查找此模块提供的可选参数

python -m torch.distributed.launch --help

重要提示

1. 该实用程序和多进程分布式(单节点或多节点)GPU 训练目前仅通过 NCCL 分布式后端获得最佳性能。因此,NCCL 后端是 GPU 训练推荐使用的后端。

2. 在您的训练程序中,您必须解析命令行参数:--local-rank=LOCAL_PROCESS_RANK,该参数将由该模块提供。如果您的训练程序使用 GPU,您应该确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。这可以通过以下方式实现:

解析 local_rank 参数

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

使用以下任一方式将设备设置为 local rank:

>>> torch.cuda.set_device(args.local_rank)  # before your code runs

>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...

版本 2.0.0 中已更改: 启动器将 --local-rank=<rank> 参数传递给您的脚本。从 PyTorch 2.0.0 起,使用带连字符的 --local-rank 优先于之前使用的带下划线的 --local_rank

为了向后兼容,用户可能需要处理这两种情况的参数解析代码。这意味着在参数解析器中同时包含 "--local-rank""--local_rank"。如果只提供了 "--local_rank",启动器将触发一个错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank" 应该足够了。

3. 在您的训练程序中,您应该在开头调用以下函数来启动分布式后端。强烈建议 init_method=env://。其他初始化方法(例如 tcp://)可能有效,但 env:// 是该模块官方支持的方法。

>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')

4. 在您的训练程序中,您可以使用常规的分布式函数,也可以使用 torch.nn.parallel.DistributedDataParallel() 模块。如果您的训练程序使用 GPU 进行训练,并且您希望使用 torch.nn.parallel.DistributedDataParallel() 模块,以下是配置方法。

>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)

请确保将 device_ids 参数设置为您的代码将操作的唯一 GPU 设备 ID。这通常是进程的本地排名。换句话说,要使用此实用程序,device_ids 需要是 [args.local_rank],并且 output_device 需要是 args.local_rank

5. 另一种通过环境变量 LOCAL_RANKlocal_rank 传递给子进程的方法。当您使用 --use-env=True 启动脚本时,将启用此行为。您必须调整上面的子进程示例,将 args.local_rank 替换为 os.environ['LOCAL_RANK'];当您指定此标志时,启动器将不会传递 --local-rank

警告

local_rank 不是全局唯一的:它仅在机器上的每个进程中是唯一的。因此,不要使用它来决定您是否应该,例如,写入网络文件系统。有关不正确执行此操作可能出错的示例,请参阅 pytorch/pytorch#12042

Spawn 实用程序#

Python 的 Multiprocessing 包 - torch.multiprocessing 包还在 torch.multiprocessing.spawn() 中提供了一个 spawn 函数。此辅助函数可用于创建多个进程。它通过传入您要运行的函数来工作,并创建 N 个进程来运行它。这也可以用于多进程分布式训练。

有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现

请注意,此函数需要 Python 3.4 或更高版本。

调试 torch.distributed 应用程序#

由于难以理解的挂起、崩溃或跨 rank 的不一致行为,调试分布式应用程序可能具有挑战性。torch.distributed 提供了一套工具来帮助以自助方式调试训练应用程序。

Python 断点#

在分布式环境中非常方便地使用 Python 的调试器,但由于它不能开箱即用,许多人根本不使用它。PyTorch 提供了一个围绕 pdb 的自定义包装器,可以简化此过程。

torch.distributed.breakpoint 使此过程变得容易。在内部,它通过两种方式自定义了 pdb 的断点行为,但除此之外,它的行为与普通 pdb 一样。

  1. 仅在一个 rank 上附加调试器(由用户指定)。

  2. 确保所有其他 rank 停止,方法是使用一个 torch.distributed.barrier(),一旦被调试的 rank 发出 continue 就会释放。

  3. 将 stdin 从子进程重定向,以便它连接到您的终端。

要使用它,只需在所有 rank 上调用 torch.distributed.breakpoint(rank),并在每次调用时使用相同的值作为 rank

监控屏障#

从 v1.10 开始,torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代方案存在,它会在崩溃时提供有关哪个 rank 可能存在故障的有用信息,即不是所有 rank 都在提供的超时时间内调用 torch.distributed.monitored_barrier()torch.distributed.monitored_barrier() 使用 send/recv 通信原语在类似确认的过程中实现了一个主机端屏障,允许 rank 0 报告哪些 rank 未能在规定时间内确认屏障。例如,考虑以下函数,其中 rank 1 未调用 torch.distributed.monitored_barrier()(实际上这可能是由于之前的集体通信中的应用程序错误或挂起)。

import os
from datetime import timedelta

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())

以下错误消息将在 rank 0 上生成,允许用户确定哪些 rank 可能存在故障并进一步调查。

RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG#

通过 TORCH_CPP_LOG_LEVEL=INFO,可以使用环境变量 TORCH_DISTRIBUTED_DEBUG 来触发额外的有用日志记录和集体同步检查,以确保所有 rank 都得到适当的同步。TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF(默认)、INFODETAIL,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL 可能会影响应用程序性能,因此仅在调试问题时使用。

设置 TORCH_DISTRIBUTED_DEBUG=INFO 将导致在初始化使用 torch.nn.parallel.DistributedDataParallel() 训练的模型时产生额外的调试日志,而 TORCH_DISTRIBUTED_DEBUG=DETAIL 将额外记录选定迭代次数的运行时性能统计信息。这些运行时统计信息包括前向传播时间、反向传播时间、梯度通信时间等数据。例如,给定以下应用程序。

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)

    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    inp = torch.randn(10, 10).cuda()
    print("train")

    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())

在初始化时会生成以下日志。

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

在运行时会生成以下日志(当设置了 TORCH_DISTRIBUTED_DEBUG=DETAIL 时)。

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674

此外,TORCH_DISTRIBUTED_DEBUG=INFO 由于模型中存在未使用的参数,增强了 torch.nn.parallel.DistributedDataParallel() 中的崩溃日志记录。目前,如果前向传播中存在可能未使用的参数,则必须将 find_unused_parameters=True 传递给 torch.nn.parallel.DistributedDataParallel() 的初始化,并且从 v1.10 开始,所有模型输出都需要在损失计算中使用,因为 torch.nn.parallel.DistributedDataParallel() 不支持反向传播中的未使用的参数。这些约束对于大型模型尤其具有挑战性,因此在发生崩溃并出现错误时,torch.nn.parallel.DistributedDataParallel() 将记录所有未使用的参数的完全限定名称。例如,在上述应用程序中,如果我们修改 loss 的计算方式为 loss = output[1],那么 TwoLinLayerNet.a 在反向传播中将不会收到梯度,从而导致 DDP 失败。崩溃时,用户将获得有关未使用的参数的信息,这对于大型模型来说可能难以手动查找。

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0

设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将触发对用户直接或间接发出的每个集体通信的附加一致性和同步检查(例如 DDP 的 allreduce)。这是通过创建一个包装器进程组来实现的,该包装器包装了由 torch.distributed.init_process_group()torch.distributed.new_group() API 返回的所有进程组。因此,这些 API 将返回一个包装器进程组,该进程组可以像常规进程组一样使用,但在将集体通信分派给底层进程组之前执行一致性检查。目前,这些检查包括一个 torch.distributed.monitored_barrier(),它确保所有 rank 完成其未完成的集体通信调用,并报告卡住的 rank。接下来,通过确保所有集体通信函数匹配并使用一致的张量形状进行调用来检查集体通信的一致性。如果不匹配,当应用程序崩溃时会包含详细的错误报告,而不是挂起或无信息的错误消息。例如,考虑以下函数,它在 torch.distributed.all_reduce() 中具有不匹配的输入形状。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())

使用 NCCL 后端,这样的应用程序可能会导致挂起,在非平凡的情况下很难找到根本原因。如果用户启用了 TORCH_DISTRIBUTED_DEBUG=DETAIL 并重新运行应用程序,以下错误消息将揭示根本原因。

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]

注意

为了在运行时对调试级别进行细粒度控制,还可以使用函数 torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()

此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,在检测到集体通信失步时记录整个调用堆栈。这些集体通信失步检查将适用于所有使用 c10d 集体通信的应用程序,这些通信由使用 torch.distributed.init_process_group()torch.distributed.new_group() API 创建的进程组支持。

torch.distributed.debug HTTP 服务器#

torch.distributed.debug 模块提供了一个 HTTP 服务器,可用于调试分布式应用程序。可以通过调用 torch.distributed.debug.start_debug_server() 来启动服务器。这允许用户在运行时收集所有 worker 的数据。

torch.distributed.debug.start_debug_server(port=25999, worker_port=0)[source]#

在所有 worker 上启动调试服务器堆栈。前端调试服务器仅在 rank0 上启动,而每个 rank 的 worker 服务器在所有 rank 上启动。

此服务器提供了一个 HTTP 前端,允许同时调试所有 rank 的慢速和死锁的分布式作业。它收集堆栈跟踪、FlightRecorder 事件和性能配置文件等数据。

这依赖于默认情况下未安装的依赖项。

依赖项:- Jinja2 - aiohttp

警告:这仅适用于受信任的网络环境。调试服务器不设计为安全的,不应暴露给公共互联网。有关更多详细信息,请参阅 SECURITY.md。

警告:这是一个实验性功能,可能会随时更改。

参数:
  • port (int) – 启动前端调试服务器的端口。

  • worker_port (int) – 启动 worker 服务器的端口。默认为 0,这将导致 worker 服务器绑定到一个临时端口。

torch.distributed.debug.stop_debug_server()[source]#

关闭调试服务器并停止前端调试服务器进程。

日志记录#

除了通过 torch.distributed.monitored_barrier()TORCH_DISTRIBUTED_DEBUG 进行显式调试支持外,torch.distributed 的底层 C++ 库还输出各种级别的日志消息。这些消息有助于了解分布式训练作业的执行状态并排除网络连接失败等问题。下表显示了如何通过 TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG 环境变量的组合来调整日志级别。

TORCH_CPP_LOG_LEVEL

TORCH_DISTRIBUTED_DEBUG

生效日志级别

ERROR

ignored

错误

WARNING

ignored

警告

INFO

ignored

Info

INFO

INFO

Debug

INFO

DETAIL

Trace (a.k.a. All)

分布式组件引发派生自 RuntimeError 的自定义异常类型。

  • torch.distributed.DistError:这是所有分布式异常的基类。

  • torch.distributed.DistBackendError:当发生后端特定的错误时,将抛出此异常。例如,如果使用 NCCL 后端,并且用户尝试使用 NCCL 库不可用的 GPU。

  • torch.distributed.DistNetworkError:当网络库遇到错误时(例如:连接被重置),将抛出此异常。

  • torch.distributed.DistStoreError:当 Store 遇到错误时(例如:TCPStore 超时),将抛出此异常。

class torch.distributed.DistError#

在分布式库中发生错误时引发的异常。

class torch.distributed.DistBackendError#

在分布式后端发生错误时引发的异常。

class torch.distributed.DistNetworkError#

在分布式网络中发生错误时引发的异常。

class torch.distributed.DistStoreError#

在分布式存储中发生错误时引发的异常。

如果您正在进行单节点训练,那么交互式地设置脚本的断点可能会很方便。我们提供了一种方便的方法来设置单个 rank 的断点。

torch.distributed.breakpoint(rank=0, skip=0, timeout_s=3600)[source]#

设置断点,但仅在单个 rank 上。所有其他 rank 将等待您完成断点处理后再继续。

参数:
  • rank (int) – 要中断的 rank。默认值:0

  • skip (int) – 跳过对该断点的首次 skip 次调用。默认值:0