分布式通信包 - torch.distributed#
创建日期:2017年7月12日 | 最后更新日期:2025年7月14日
注意
请参阅 PyTorch 分布式概览 以获取分布式训练所有相关功能的简要介绍。
后端#
torch.distributed
支持三种内置后端,每种后端具有不同的功能。下表显示了哪些函数可用于 CPU / CUDA 张量。MPI 仅在用于构建 PyTorch 的实现支持 CUDA 时才支持 CUDA。
后端 |
|
|
|
|||
---|---|---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
发送 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
接收 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
广播 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
所有规约 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
规约 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
所有收集 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
收集 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
分散 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
规约分散 |
✓ |
✓ |
✘ |
✘ |
✘ |
✓ |
点对点 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
屏障 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
PyTorch 自带的后端#
PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型版)。默认情况下,对于 Linux,Gloo 和 NCCL 后端已构建并包含在 PyTorch 分布式中(NCCL 仅在使用 CUDA 构建时)。MPI 是一个可选后端,只有在您从源代码构建 PyTorch 时才能包含它。(例如,在安装了 MPI 的主机上构建 PyTorch。)
注意
截至 PyTorch v1.8,Windows 支持除 NCCL 之外的所有集体通信后端。如果 init_process_group()
的 init_method
参数指向文件,则该文件必须符合以下架构
本地文件系统,
init_method="file:///d:/tmp/some_file"
共享文件系统,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。
使用哪个后端?#
过去,我们经常被问到:“我应该使用哪个后端?”
经验法则
对于分布式 GPU 训练使用 NCCL 后端
对于分布式 CPU 训练使用 Gloo 后端。
带 InfiniBand 互连的 GPU 主机
使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。
带以太网互连的 GPU 主机
使用 NCCL,因为它目前提供最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果您遇到 NCCL 问题,请使用 Gloo 作为备用选项。(请注意,Gloo 目前在 GPU 上的运行速度比 NCCL 慢。)
带 InfiniBand 互连的 CPU 主机
如果您的 InfiniBand 启用了 IP over IB,则使用 Gloo,否则使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。
带以太网互连的 CPU 主机
使用 Gloo,除非您有特殊原因使用 MPI。
常用环境变量#
选择要使用的网络接口#
默认情况下,NCCL 和 Gloo 后端都会尝试找到正确的网络接口来使用。如果自动检测到的接口不正确,您可以使用以下环境变量覆盖它(适用于相应的后端)
NCCL_SOCKET_IFNAME,例如
export NCCL_SOCKET_IFNAME=eth0
GLOO_SOCKET_IFNAME,例如
export GLOO_SOCKET_IFNAME=eth0
如果您使用 Gloo 后端,可以通过逗号分隔来指定多个接口,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。后端将以轮询方式在这些接口之间分派操作。所有进程在此变量中指定相同数量的接口至关重要。
其他 NCCL 环境变量#
调试 - 如果 NCCL 失败,您可以设置 NCCL_DEBUG=INFO
以打印明确的警告消息以及基本的 NCCL 初始化信息。
您还可以使用 NCCL_DEBUG_SUBSYS
来获取特定 NCCL 方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL
将打印集体调用日志,这在调试挂起时可能很有帮助,尤其是由集体类型或消息大小不匹配引起的挂起。如果拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH
将有助于检查详细的检测结果,并在需要 NCCL 团队进一步帮助时保存以供参考。
性能调优 - NCCL 根据其拓扑检测执行自动调优,以节省用户的调优工作。在某些基于套接字的系统中,用户仍然可以尝试调优 NCCL_SOCKET_NTHREADS
和 NCCL_NSOCKS_PERTHREAD
以增加套接字网络带宽。这两个环境变量已由 NCCL 为一些云提供商(例如 AWS 或 GCP)预先调优。
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档
您可以使用 torch.distributed.ProcessGroupNCCL.NCCLConfig
和 torch.distributed.ProcessGroupNCCL.Options
进一步调整 NCCL 通信器。在解释器中使用 help
(例如 help(torch.distributed.ProcessGroupNCCL.NCCLConfig)
)了解更多信息。
基础#
torch.distributed
包为跨多个计算节点(在一台或多台机器上运行)的多进程并行提供 PyTorch 支持和通信原语。类 torch.nn.parallel.DistributedDataParallel()
基于此功能提供同步分布式训练,作为任何 PyTorch 模型的包装器。这与 多进程包 - torch.multiprocessing 和 torch.nn.DataParallel()
提供的并行性不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。
在单机同步情况下,torch.distributed
或 torch.nn.parallel.DistributedDataParallel()
包装器可能仍优于其他数据并行方法,包括 torch.nn.DataParallel()
每个进程都维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经聚合并在进程之间平均,因此每个进程都相同,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。
每个进程都包含一个独立的 Python 解释器,消除了由于从单个 Python 进程驱动多个执行线程、模型副本或 GPU 而带来的额外解释器开销和“GIL 抖动”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。
初始化#
在使用任何其他方法之前,需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh()
函数初始化包。两者都会阻塞直到所有进程都加入。
警告
初始化不是线程安全的。进程组创建应从单个线程执行,以防止跨排名出现不一致的“UUID”分配,并防止初始化期间的竞争导致挂起。
- torch.distributed.is_available()[source]#
如果分布式包可用,则返回
True
。否则,
torch.distributed
不公开任何其他 API。目前,torch.distributed
可在 Linux、MacOS 和 Windows 上使用。从源代码构建 PyTorch 时,设置USE_DISTRIBUTED=1
以启用它。目前,Linux 和 Windows 的默认值为USE_DISTRIBUTED=1
,MacOS 的默认值为USE_DISTRIBUTED=0
。- 返回类型
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]#
初始化默认分布式进程组。
这也会初始化分布式包。
- 有两种主要方式初始化进程组
明确指定
store
、rank
和world_size
。指定
init_method
(一个 URL 字符串),它指示在哪里/如何发现对等节点。可选地指定rank
和world_size
,或者将所有必需的参数编码到 URL 中并省略它们。
如果两者都没有指定,则
init_method
假定为“env://”。- 参数
backend (str 或 Backend, 可选) – 要使用的后端。根据构建时配置,有效值包括
mpi
、gloo
、nccl
、ucc
,或由第三方插件注册的后端。自 2.6 版以来,如果未提供backend
,c10d 将使用为 device_id kwarg(如果提供)指示的设备类型注册的后端。目前已知的默认注册是:cuda
为nccl
,cpu
为gloo
。如果既未提供backend
也未提供device_id
,c10d 将检测运行时机器上的加速器并使用为该检测到的加速器(或cpu
)注册的后端。此字段可以作为小写字符串(例如,"gloo"
)提供,也可以通过Backend
属性(例如,Backend.GLOO
)访问。如果使用nccl
后端每台机器有多个进程,则每个进程必须对其使用的每个 GPU 拥有独占访问权限,因为在进程之间共享 GPU 可能会导致死锁或 NCCL 无效使用。ucc
后端是实验性的。设备的默认后端可以通过get_default_backend_for_device()
查询。init_method (str, 可选) – 指定如何初始化进程组的 URL。如果未指定
init_method
或store
,则默认为“env://”。与store
互斥。world_size (int, 可选) – 参与作业的进程数。如果指定了
store
,则必需。rank (int, 可选) – 当前进程的排名(它应该是一个介于 0 和
world_size
-1 之间的数字)。如果指定了store
,则必需。store (Store, 可选) – 所有工作程序都可访问的键/值存储,用于交换连接/地址信息。与
init_method
互斥。timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端为 30 分钟。这是集合通信将异步中止并且进程将崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能导致后续的 CUDA 操作在损坏的数据上运行。当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。
group_name (str, 可选, 已弃用) – 组名。此参数被忽略
pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传入的附加选项。目前,我们支持的唯一选项是
nccl
后端的ProcessGroupNCCL.Options
,可以指定is_high_priority_stream
,以便 nccl 后端在有计算内核等待时可以选择高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tdevice_id (torch.device | int, 可选) – 此进程将处理的单个特定设备,允许后端特定优化。目前这有两个效果,仅在 NCCL 下:通信器立即形成(立即调用
ncclCommInit*
而不是正常的延迟调用),并且子组将在可能的情况下使用ncclCommSplit
以避免创建组的不必要开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。如果提供 int,API 假定将使用编译时的加速器类型。
注意
要启用
backend == Backend.MPI
,PyTorch 需要在支持 MPI 的系统上从源代码构建。注意
对多个后端的支持是实验性的。目前,当未指定后端时,将同时创建
gloo
和nccl
后端。gloo
后端将用于 CPU 张量的集合通信,而nccl
后端将用于 CUDA 张量的集合通信。可以通过传入格式为“: , : ”的字符串来指定自定义后端,例如“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source]#
根据 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。
这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度都标记为 mesh_dim_names[i]。
注意
init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有排名上相同。不一致的 mesh_shape 可能导致挂起。
注意
如果没有找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组。
- 参数
- 返回
一个
DeviceMesh
对象,表示设备布局。- 返回类型
示例
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[source]#
检查此进程是否使用
torch.distributed.elastic
(也称为 torchelastic)启动。使用
TORCHELASTIC_RUN_ID
环境变量的存在作为代理,以确定当前进程是否使用 torchelastic 启动。这是一个合理的代理,因为TORCHELASTIC_RUN_ID
映射到 rendezvous ID,该 ID 始终是一个非空值,表示用于对等发现的作业 ID。- 返回类型
- torch.distributed.get_default_backend_for_device(device)[source]#
返回给定设备的默认后端。
- 参数
device (Union[str, torch.device]) – 获取默认后端的设备。
- 返回
给定设备的默认后端(小写字符串)。
- 返回类型
目前支持三种初始化方法
TCP 初始化#
有两种使用 TCP 进行初始化的方法,都要求所有进程都可以访问的网络地址和期望的 world_size
。第一种方法要求指定一个属于排名 0 进程的地址。这种初始化方法要求所有进程都手动指定排名。
请注意,最新分布式包中不再支持多播地址。group_name
也已弃用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化#
此方法将从环境变量中读取配置,允许完全自定义信息的获取方式。要设置的变量是
MASTER_PORT
- 必需;必须是排名 0 机器上的空闲端口MASTER_ADDR
- 必需(排名 0 除外);排名 0 节点的地址WORLD_SIZE
- 必需;可以在此处设置,也可以在调用 init 函数时设置RANK
- 必需;可以在此处设置,也可以在调用 init 函数时设置
排名为 0 的机器将用于设置所有连接。
这是默认方法,这意味着不需要指定 init_method
(或者可以是 env://
)。
改进初始化时间#
TORCH_GLOO_LAZY_INIT
- 按需建立连接,而不是使用完整网格,这可以大大缩短非 all2all 操作的初始化时间。
后初始化#
一旦运行 torch.distributed.init_process_group()
,就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()
。
- class torch.distributed.Backend(name)[source]#
一个类似枚举的后端类。
可用后端:GLOO、NCCL、UCC、MPI、XCCL 和其他注册后端。
此类的值为小写字符串,例如
"gloo"
。它们可以作为属性访问,例如Backend.NCCL
。可以直接调用此函数来解析字符串,例如,
Backend(backend_str)
将检查backend_str
是否有效,如果有效则返回解析后的小写字符串。它也接受大写字符串,例如,Backend("GLOO")
返回"gloo"
。注意
条目
Backend.UNDEFINED
存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假定其存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source]#
使用给定名称和实例化函数注册新后端。
此类方法由第三方
ProcessGroup
扩展用于注册新后端。- 参数
name (str) –
ProcessGroup
扩展的后端名称。它应该与init_process_group()
中的名称匹配。func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括
store
、rank
、world_size
和timeout
。extended_api (bool, 可选) – 后端是否支持扩展参数结构。默认值:
False
。如果设置为True
,后端将获得一个c10d::DistributedBackendOptions
实例和一个由后端实现定义的进程组选项对象。device (str 或 str 列表, 可选) – 此后端支持的设备类型,例如“cpu”、“cuda”等。如果为 None,则假定为“cpu”和“cuda”。
注意
此对第三方后端的支持是实验性的,可能会更改。
- torch.distributed.get_backend(group=None)[source]#
返回给定进程组的后端。
- 参数
group (ProcessGroup, 可选) – 要操作的进程组。默认是通用主进程组。如果指定了另一个特定组,则调用进程必须是
group
的一部分。- 返回
给定进程组的后端(小写字符串)。
- 返回类型
关机#
通过调用 destroy_process_group()
在退出时清理资源很重要。
要遵循的最简单模式是在训练脚本中不再需要通信时(通常在 main() 的末尾)通过调用 destroy_process_group()
并将 group
参数的默认值设置为 None 来销毁每个进程组和后端。调用应该每个训练器进程进行一次,而不是在外部进程启动器级别。
如果在超时时间内所有 pg 中的所有排名都没有调用 destroy_process_group()
,尤其是在应用程序中有多个进程组时(例如 N 维并行),则可能会在退出时挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,而 ncclCommAbort 必须集体调用,但如果由 python 的 GC 调用 ProcessGroupNCCL 的析构函数,则调用顺序是不确定的。调用 destroy_process_group()
有助于确保 ncclCommAbort 在所有排名中以一致的顺序调用,并避免在 ProcessGroupNCCL 的析构函数中调用 ncclCommAbort。
重新初始化#
destroy_process_group
也可以用于销毁单个进程组。一个用例可能是容错训练,其中进程组可能在运行时被销毁然后重新初始化。在这种情况下,在调用 destroy 之后和随后的初始化之前,通过除 torch.distributed 原语之外的某种方式同步训练器进程至关重要。此行为目前不受支持/未经测试,因为实现此同步很困难,并且被认为是一个已知问题。如果这是一个阻碍您的用例,请提交 github 问题或 RFC。
组#
默认情况下,集合操作在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。new_group()
函数可用于创建新组,其中包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group
参数传递给所有集合操作(集合操作是用于以某些众所周知的编程模式交换信息的分布式函数)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]#
创建一个新的分布式组。
此函数要求主组中的所有进程(即作为分布式作业一部分的所有进程)都进入此函数,即使它们不打算成为该组的成员。此外,组应在所有进程中以相同的顺序创建。
警告
安全并发使用:当使用
NCCL
后端处理多个进程组时,用户必须确保全局一致地执行所有排名上的集合操作。如果进程中的多个线程发出集合操作,则需要显式同步以确保一致的顺序。
当使用 torch.distributed 通信 API 的异步变体时,会返回一个工作对象,并且通信内核在单独的 CUDA 流上排队,从而允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,它们必须通过调用 work.wait() 与其他 cuda 流同步,然后才能使用另一个进程组。
有关更多详细信息,请参阅 并发使用多个 NCCL 通信器 <https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently>。
- 参数
ranks (int 列表) – 组成员的排名列表。如果为
None
,则将设置为所有排名。默认值为None
。timeout (timedelta, 可选) – 有关详细信息和默认值,请参阅 init_process_group。
backend (str 或 Backend, 可选) – 要使用的后端。根据构建时配置,有效值为
gloo
和nccl
。默认情况下使用与全局组相同的后端。此字段应作为小写字符串(例如,"gloo"
)提供,也可以通过Backend
属性(例如,Backend.GLOO
)访问。如果传入None
,则将使用与默认进程组对应的后端。默认值为None
。pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传入的附加选项。例如,对于
nccl
后端,可以指定is_high_priority_stream
,以便进程组可以获取高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional): 在进程组创建结束时执行组局部屏障。这与非成员排名不需要调用 API 且不加入屏障不同。group_desc (str, 可选) – 描述进程组的字符串。
device_id (torch.device, 可选) – 一个单一的、特定的设备,用于“绑定”此进程,如果提供了此字段,new_group 调用将尝试立即初始化该设备的通信后端。
- 返回
分布式组的句柄,可以传递给集体调用或 GroupMember.NON_GROUP_MEMBER,如果该排名不属于
ranks
。
N.B. use_local_synchronization 不适用于 MPI。
N.B. 尽管 use_local_synchronization=True 在大型集群和小型进程组中可以显著提高速度,但必须谨慎,因为它会改变集群行为,因为非成员排名不加入组屏障()。
N.B. use_local_synchronization=True 可能导致每个排名创建多个重叠进程组时发生死锁。为避免这种情况,请确保所有排名遵循相同的全局创建顺序。
- torch.distributed.get_group_rank(group, global_rank)[source]#
将全局排名转换为组排名。
global_rank
必须是group
的一部分,否则将引发 RuntimeError。- 参数
group (ProcessGroup) – 要查找相对排名的进程组。
global_rank (int) – 要查询的全局排名。
- 返回
global_rank
相对于group
的组排名- 返回类型
N.B. 对默认进程组调用此函数将返回同一性
DeviceMesh#
DeviceMesh 是一种更高级别的抽象,它管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置排名,并且有助于轻松管理这些分布式进程组。init_device_mesh()
函数可用于创建新的 DeviceMesh,其网格形状描述了设备拓扑。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source]#
DeviceMesh 表示设备的网格,其中设备的布局可以表示为 n 维数组,n 维数组的每个值都是默认进程组排名的全局 ID。
DeviceMesh 可用于设置跨集群的 N 维设备连接,并管理 N 维并行性的进程组。通信可以在 DeviceMesh 的每个维度上单独发生。DeviceMesh 尊重用户已选择的设备(即,如果用户在 DeviceMesh 初始化之前调用 torch.cuda.set_device),并且如果用户没有事先设置设备,则会为当前进程选择/设置设备。请注意,手动设备选择必须在 DeviceMesh 初始化之前发生。
DeviceMesh 也可以用作上下文管理器,与 DTensor API 一起使用时。
注意
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有排名上相同。不一致的 mesh 将导致静默挂起。
- 参数
device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。
mesh (ndarray) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。
- 返回
一个
DeviceMesh
对象,表示设备布局。- 返回类型
以下程序以 SPMD 方式在每个进程/排名上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。对网格的第一个维度进行归约将归约跨列(0, 4)...和(3, 7),对网格的第二个维度进行归约将归约跨行(0, 1, 2, 3)和(4, 5, 6, 7)。
示例
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
- static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]#
使用
device_type
从现有ProcessGroup
或现有ProcessGroup
列表构造DeviceMesh
。构造的设备网格的维度数等于传入的组数。例如,如果传入单个进程组,则生成的 DeviceMesh 为一维网格。如果传入 2 个进程组列表,则生成的 DeviceMesh 为二维网格。
如果传入多个组,则
mesh
和mesh_dim_names
参数是必需的。传入的进程组的顺序决定了网格的拓扑结构。例如,第一个进程组将是 DeviceMesh 的第 0 维。传入的 mesh 张量必须与传入的进程组具有相同的维度数,并且 mesh 张量中的维度顺序必须与传入的进程组中的顺序匹配。- 参数
group (ProcessGroup 或 ProcessGroup 列表) – 现有 ProcessGroup 或现有 ProcessGroup 列表。
device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。
mesh (torch.Tensor 或 ArrayLike, 可选) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。默认值为 None。
mesh_dim_names (tuple[str], 可选) – 一个元组,包含要分配给描述设备布局的多维数组的每个维度的网格维度名称。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。默认值为 None。
- 返回
一个
DeviceMesh
对象,表示设备布局。- 返回类型
- get_all_groups()[source]#
返回所有网格维度的进程组列表。
- 返回
一个
ProcessGroup
对象列表。- 返回类型
list[torch.distributed.distributed_c10d.ProcessGroup]
- get_group(mesh_dim=None)[source]#
返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是一维的,则返回网格中唯一的 ProcessGroup。
- 参数
mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或网格维度的索引。
None. (默认值为) –
- 返回
一个
ProcessGroup
对象。- 返回类型
ProcessGroup
- get_local_rank(mesh_dim=None)[source]#
返回 DeviceMesh 给定 mesh_dim 的本地排名。
- 参数
mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或网格维度的索引。
None. (默认值为) –
- 返回
一个表示本地排名的整数。
- 返回类型
以下程序以 SPMD 方式在每个进程/排名上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在排名 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 0。在排名 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 1。在排名 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 0。在排名 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 1。在排名 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 2。在排名 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 3。
示例
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
点对点通信#
- torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]#
同步发送张量。
警告
NCCL 后端不支持
tag
。
- torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source]#
同步接收张量。
警告
NCCL 后端不支持
tag
。
isend()
和 irecv()
在使用时返回分布式请求对象。通常,此对象的类型未指定,因为它们不应手动创建,但它们保证支持两种方法
is_completed()
- 如果操作已完成,则返回 Truewait()
- 将阻塞进程直到操作完成。is_completed()
保证在返回后返回 True。
- torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]#
异步发送张量。
警告
在请求完成之前修改
tensor
会导致未定义行为。警告
NCCL 后端不支持
tag
。与阻塞的 send 不同,isend 允许 src == dst 排名,即发送给自己。
- torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]#
异步接收张量。
警告
NCCL 后端不支持
tag
。与阻塞的 recv 不同,irecv 允许 src == dst 排名,即从自身接收。
- torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None)[source]#
同步发送
object_list
中可 pickle 的对象。类似于
send()
,但可以传入 Python 对象。请注意,object_list
中的所有对象都必须可 pickle 才能发送。接收方必须提供大小相等的列表。- 参数
object_list (Any 列表) – 要发送的输入对象列表。每个对象都必须是可 pickle 的。接收方必须提供大小相等的列表。
dst (int) – 要将
object_list
发送到的目标排名。目标排名基于全局进程组(无论group
参数如何)group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要操作的进程组。如果为 None,则将使用默认进程组。默认值为
None
。device (
torch.device
, 可选) – 如果不为 None,对象将被序列化并转换为张量,然后移动到device
再发送。默认值为None
。group_dst (int, 可选) –
group
上的目标排名。必须指定dst
和group_dst
中的一个,但不能同时指定。
- 返回
无
.
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此项,以便每个排名都有一个单独的 GPU。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
send_object_list()
隐式使用pickle
模块,已知其不安全。可以构造恶意 pickle 数据,在反 pickle 过程中执行任意代码。仅调用此函数处理您信任的数据。警告
使用 GPU 张量调用
send_object_list()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用send()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]#
同步接收
object_list
中的可序列化对象。类似于
recv()
,但可以接收 Python 对象。- 参数
object_list (List[Any]) – 要接收到的对象列表。必须提供与发送列表大小相等的尺寸列表。
src (int, optional) – 接收
object_list
的源等级。源等级基于全局进程组(无论group
参数如何)。如果设置为 None,则从任何等级接收。默认值为None
。group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要操作的进程组。如果为 None,则将使用默认进程组。默认值为
None
。device (
torch.device
, optional) – 如果不为 None,则在此设备上接收。默认值为None
。group_src (int, 可选) –
group
上的目标排名。同时指定src
和group_src
无效。
- 返回
发送者等级。如果等级不是组的一部分,则为 -1。如果等级是组的一部分,
object_list
将包含来自src
等级的已发送对象。
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此项,以便每个排名都有一个单独的 GPU。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
recv_object_list()
隐式使用pickle
模块,该模块已知不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
使用 GPU 张量调用
recv_object_list()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用recv()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[source]#
异步发送或接收一批张量并返回请求列表。
处理
p2p_op_list
中的每个操作并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。- 参数
p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 点对点操作列表(每个操作符的类型为
torch.distributed.P2POp
)。列表中 isend/irecv 的顺序很重要,它需要与远程端对应的 isend/irecv 匹配。- 返回
调用 op_list 中相应操作返回的分布式请求对象列表。
- 返回类型
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前的 GPU 设备,否则将导致意外的挂起问题。
此外,如果此 API 是传递给
dist.P2POp
的group
中的第一个集体调用,则group
的所有等级都必须参与此 API 调用;否则,行为是未定义的。如果此 API 调用不是group
中的第一个集体调用,则允许仅涉及group
子集等级的批处理 P2P 操作。
同步和异步集体操作#
每个集体操作函数都支持以下两种操作,具体取决于传递给集体操作的 async_op
标志的设置
同步操作 - 默认模式,当 async_op
设置为 False
时。当函数返回时,保证集体操作已执行。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何进一步利用集体调用输出的函数调用将按预期运行。对于 CUDA 集体操作,利用同一 CUDA 流上输出的函数调用将按预期运行。用户必须注意在不同流下运行时的同步。有关 CUDA 语义(如流同步)的详细信息,请参阅 CUDA 语义。请参阅以下脚本,了解 CPU 和 CUDA 操作的这些语义差异的示例。
异步操作 - 当 async_op
设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您无需手动创建它,并且保证支持两种方法
is_completed()
- 对于 CPU 集体操作,如果完成则返回True
。对于 CUDA 操作,如果操作已成功排队到 CUDA 流并且可以在默认流上使用输出而无需进一步同步,则返回True
。wait()
- 对于 CPU 集体操作,将阻塞进程直到操作完成。对于 CUDA 集体操作,将阻塞当前活动的 CUDA 流直到操作完成(但不会阻塞 CPU)。get_future()
- 返回torch._C.Future
对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但点对点操作除外。注意:随着我们继续采用 Futures 并合并 API,get_future()
调用可能会变得冗余。
示例
以下代码可以作为使用分布式集体操作时 CUDA 操作语义的参考。它显示了在不同 CUDA 流上使用集体输出时显式同步的必要性
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集体函数#
- torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]#
将张量广播到整个组。
tensor
在参与集体操作的所有进程中必须具有相同数量的元素。- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
- torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]#
将
object_list
中的可序列化对象广播到整个组。类似于
broadcast()
,但可以传入 Python 对象。请注意,object_list
中的所有对象都必须是可序列化的才能被广播。- 参数
object_list (List[Any]) – 要广播的输入对象列表。每个对象都必须是可序列化的。只有
src
等级上的对象才会被广播,但每个等级都必须提供相等大小的列表。src (int) – 广播
object_list
的源等级。源等级基于全局进程组(无论group
参数如何)group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要操作的进程组。如果为 None,则将使用默认进程组。默认值为
None
。device (
torch.device
, optional) – 如果不为 None,则对象会被序列化并转换为张量,然后移动到device
再进行广播。默认值为None
。group_src (int) –
group
上的源等级。不能同时指定group_src
和src
。
- 返回
None
。如果等级是组的一部分,object_list
将包含来自src
等级的广播对象。
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此项,以便每个排名都有一个单独的 GPU。注意
请注意,此 API 与
broadcast()
集体操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
broadcast_object_list()
隐式使用pickle
模块,该模块已知不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
使用 GPU 张量调用
broadcast_object_list()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用broadcast()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#
以所有机器都获得最终结果的方式,减少所有机器上的张量数据。
调用后,
tensor
在所有进程中将是逐位相同的。支持复数张量。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
示例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]#
减少所有机器上的张量数据。
只有等级为
dst
的进程才会接收最终结果。- 参数
tensor (Tensor) – 集体操作的输入和输出。该函数就地操作。
dst (int) – 全局进程组上的目标排名(无论
group
参数如何)op (optional) –
torch.distributed.ReduceOp
枚举中的一个值。指定用于逐元素约简的操作。group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作
group_dst (int) –
group
上的目标等级。必须指定group_dst
和dst
中的一个,但不能同时指定。
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]#
将整个组的张量收集到一个列表中。
支持复数和大小不等的张量。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_list = [ ... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]#
从所有等级收集张量并将它们放入一个输出张量中。
此函数要求所有张量在每个进程上大小相同。
- 参数
output_tensor (Tensor) – 用于容纳所有等级张量元素的输出张量。它必须具有正确的大小,具有以下形式之一:(i) 沿主维度连接所有输入张量;对于“连接”的定义,请参阅
torch.cat()
;(ii) 沿主维度堆叠所有输入张量;对于“堆叠”的定义,请参阅torch.stack()
。下面的示例可以更好地解释支持的输出形式。input_tensor (Tensor) – 要从当前等级收集的张量。与
all_gather
API 不同,此 API 中的输入张量在所有等级中必须具有相同的大小。group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
- torch.distributed.all_gather_object(object_list, obj, group=None)[source]#
将整个组中的可序列化对象收集到一个列表中。
类似于
all_gather()
,但可以传入 Python 对象。请注意,对象必须是可序列化的才能被收集。- 参数
object_list (list[Any]) – 输出列表。它的大小应与此集体操作的组大小相同,并将包含输出。
obj (Any) – 要从当前进程广播的可序列化 Python 对象。
group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。默认值为
None
。
- 返回
None。如果调用等级是此组的一部分,则集体操作的输出将填充到输入的
object_list
中。如果调用等级不是组的一部分,则传入的object_list
将保持不变。
注意
请注意,此 API 与
all_gather()
集体操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。注意
对于基于 NCCL 的进程组,对象内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,并且用户有责任确保已设置此设备,以便每个等级通过torch.cuda.set_device()
拥有单独的 GPU。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
all_gather_object()
隐式使用pickle
模块,该模块已知不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
使用 GPU 张量调用
all_gather_object()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用all_gather()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source]#
将张量列表收集到单个进程中。
此函数要求所有张量在每个进程上大小相同。
- 参数
tensor (Tensor) – 输入张量。
gather_list (list[Tensor], optional) – 用于收集数据的适当、大小相同的张量列表(默认为 None,必须在目标等级上指定)
dst (int, optional) – 全局进程组上的目标等级(无论
group
参数如何)。(如果dst
和group_dst
都为 None,则默认为全局等级 0)group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作
group_dst (int, 可选) –
group
上的目标排名。同时指定dst
和group_dst
无效
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
注意
请注意,
gather_list
中的所有张量必须大小相同。- 示例:
>>> # We have 2 process groups, 2 ranks. >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.ones(tensor_size, device=device) + rank >>> if dist.get_rank() == 0: >>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)] >>> else: >>> gather_list = None >>> dist.gather(tensor, gather_list, dst=0) >>> # Rank 0 gets gathered data. >>> gather_list [tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0 None # Rank 1
- torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source]#
将整个组中的可序列化对象收集到单个进程中。
类似于
gather()
,但可以传入 Python 对象。请注意,对象必须是可序列化的才能被收集。- 参数
obj (Any) – 输入对象。必须是可序列化的。
object_gather_list (list[Any]) – 输出列表。在
dst
等级上,它的大小应与此集体操作的组大小相同,并将包含输出。在非 dst 等级上必须为None
。(默认为None
)dst (int, optional) – 全局进程组上的目标等级(无论
group
参数如何)。(如果dst
和group_dst
都为 None,则默认为全局等级 0)group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要操作的进程组。如果为 None,则将使用默认进程组。默认值为
None
。group_dst (int, 可选) –
group
上的目标排名。同时指定dst
和group_dst
无效
- 返回
无。在
dst
等级上,object_gather_list
将包含集体操作的输出。
注意
请注意,此 API 与 gather 集体操作略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。
注意
对于基于 NCCL 的进程组,对象内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,并且用户有责任确保已设置此设备,以便每个等级通过torch.cuda.set_device()
拥有单独的 GPU。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
gather_object()
隐式使用pickle
模块,该模块已知不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
使用 GPU 张量调用
gather_object()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用gather()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source]#
将张量列表分散到组中的所有进程。
每个进程将接收到一个张量并将其数据存储在
tensor
参数中。支持复数张量。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
注意
请注意,
scatter_list
中的所有张量必须大小相同。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> output_tensor = torch.zeros(tensor_size, device=device) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> t_ones = torch.ones(tensor_size, device=device) >>> t_fives = torch.ones(tensor_size, device=device) * 5 >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. >>> output_tensor tensor([1., 1.], device='cuda:0') # Rank 0 tensor([5., 5.], device='cuda:1') # Rank 1
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[source]#
将
scatter_object_input_list
中的可序列化对象分散到整个组。类似于
scatter()
,但可以传入 Python 对象。在每个等级上,分散的对象将作为scatter_object_output_list
的第一个元素存储。请注意,scatter_object_input_list
中的所有对象都必须是可序列化的才能被分散。- 参数
scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储分散到此等级的对象。
scatter_object_input_list (List[Any], optional) – 要分散的输入对象列表。每个对象都必须是可序列化的。只有
src
等级上的对象才会被分散,对于非 src 等级,参数可以为None
。src (int) – 分散
scatter_object_input_list
的源等级。源等级基于全局进程组(无论group
参数如何)。(如果src
和group_src
都为 None,则默认为全局等级 0)group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要操作的进程组。如果为 None,则将使用默认进程组。默认值为
None
。group_src (int, optional) –
group
上的源等级。不能同时指定src
和group_src
- 返回
None
。如果等级是组的一部分,scatter_object_output_list
的第一个元素将被设置为此等级的分散对象。
注意
请注意,此 API 与 scatter 集体操作略有不同,因为它不提供
async_op
句柄,因此将是一个阻塞调用。警告
对象集合操作存在一些严重的性能和可伸缩性限制。有关详细信息,请参阅 对象集合操作。
警告
scatter_object_list()
隐式使用pickle
模块,该模块已知不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
使用 GPU 张量调用
scatter_object_list()
不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用scatter()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#
先约简,然后将张量列表分散到组中的所有进程。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]#
先约简,然后将张量分散到组中的所有等级。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]#
拆分输入张量,然后将拆分列表分散到组中的所有进程。
之后,从组中所有进程接收到的张量将被连接并作为单个输出张量返回。
支持复数张量。
- 参数
output (Tensor) – 收集连接的输出张量。
input (Tensor) – 要分散的输入张量。
output_split_sizes – (list[Int], optional): 如果指定 None 或为空,则 dim 0 的输出拆分大小。输出张量的 dim 0 必须能被
world_size
整除。input_split_sizes – (list[Int], optional): 如果指定 None 或为空,则 dim 0 的输入拆分大小。输入张量的 dim 0 必须能被
world_size
整除。group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作。
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
警告
all_to_all_single 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]#
将输入张量列表分散到组中的所有进程,并返回输出列表中的收集张量列表。
支持复数张量。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
警告
all_to_all 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]#
同步所有进程。
如果 async_op 为 False,或者异步工作句柄在 wait() 上被调用,此集体操作将阻塞进程,直到整个组进入此函数。
- 参数
- 返回
如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不属于组,则为 None。
注意
ProcessGroupNCCL 现在会阻塞 CPU 线程直到屏障集体操作完成。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]#
类似于
torch.distributed.barrier
同步进程,但考虑可配置的超时。它能够报告在规定时间内未通过此屏障的等级。具体来说,对于非零等级,将阻塞直到从等级 0 处理一个发送/接收。等级 0 将阻塞直到处理所有来自其他等级的发送/接收,并将报告未及时响应的等级的失败。请注意,如果一个等级未达到 monitored_barrier(例如由于挂起),所有其他等级都将在 monitored_barrier 中失败。
此集体操作将阻塞组中的所有进程/等级,直到整个组成功退出函数,这对于调试和同步非常有用。但是,它可能会影响性能,因此应仅用于调试或需要主机端完全同步点的场景。出于调试目的,此屏障可以插入到应用程序的集体调用之前,以检查是否有任何等级不同步。
注意
请注意,此集体操作仅支持 GLOO 后端。
- 参数
group (ProcessGroup, optional) – 要操作的进程组。如果为
None
,则使用默认进程组。timeout (datetime.timedelta, optional) – monitored_barrier 的超时时间。如果为
None
,则使用默认进程组超时时间。wait_all_ranks (bool, optional) – 是否收集所有失败等级。默认情况下,此值为
False
,等级 0 上的monitored_barrier
将在遇到第一个失败等级时抛出异常以快速失败。通过设置wait_all_ranks=True
,monitored_barrier
将收集所有失败等级并抛出包含所有失败等级信息的错误。
- 返回
无
.
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work#
一个 Work 对象表示 PyTorch 分布式包中待处理异步操作的句柄。它由非阻塞集体操作返回,例如 dist.all_reduce(tensor, async_op=True)。
- exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr #
- get_future(self: torch._C._distributed_c10d.Work) torch.Future #
- 返回
一个
torch.futures.Future
对象,它与Work
的完成相关联。例如,可以通过fut = process_group.allreduce(tensors).get_future()
检索 future 对象。
- 示例:
下面是一个简单的 allreduce DDP 通信钩子的示例,它使用
get_future
API 检索与allreduce
完成相关联的 Future。>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future >>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD >>> tensor = bucket.buffer().div_(group_to_use.size()) >>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future() >>> ddp_model.register_comm_hook(state=None, hook=allreduce)
警告
get_future
API 支持 NCCL,并部分支持 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回一个torch.futures.Future
。在上面的示例中,
allreduce
工作将在 GPU 上使用 NCCL 后端完成,fut.wait()
将在将相应的 NCCL 流与 PyTorch 的当前设备流同步后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待整个操作在 GPU 上完成。请注意,CUDAFuture
不支持TORCH_NCCL_BLOCKING_WAIT
标志或 NCCL 的barrier()
。此外,如果通过fut.then()
添加了回调函数,它将等待直到WorkNCCL
的 NCCL 流与ProcessGroupNCCL
的专用回调流同步,并在回调流上运行回调后内联调用回调。fut.then()
将返回另一个CUDAFuture
,它保存回调的返回值和记录回调流的CUDAEvent
。对于 CPU 工作,当工作完成且值张量准备就绪时,
fut.done()
返回 true。对于 GPU 工作,
fut.done()
仅在操作已排队时返回 true。对于混合 CPU-GPU 工作(例如使用 GLOO 发送 GPU 张量),当张量已到达各自节点时,
fut.done()
返回 true,但不一定在各自 GPU 上同步(类似于 GPU 工作)。
- get_future_result(self: torch._C._distributed_c10d.Work) torch.Future #
- 返回
一个 int 类型的
torch.futures.Future
对象,它映射到 WorkResult 的枚举类型。例如,可以通过fut = process_group.allreduce(tensor).get_future_result()
检索 future 对象。
- 示例:
用户可以使用
fut.wait()
阻塞等待工作完成并通过fut.value()
获取 WorkResult。此外,用户可以使用fut.then(call_back_func)
注册回调函数,以便在工作完成时调用,而不会阻塞当前线程。
警告
get_future_result
API 支持 NCCL
- result(self: torch._C._distributed_c10d.Work) list[torch.Tensor] #
- wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool #
- 返回
真/假。
- 示例:
- try
work.wait(timeout)
- except
# some handling
警告
在正常情况下,用户无需设置超时。调用 wait() 等同于调用 synchronize():让当前流阻塞,直到 NCCL 工作完成。但是,如果设置了超时,它将阻塞 CPU 线程直到 NCCL 工作完成或超时。如果超时,将抛出异常。
- class torch.distributed.ReduceOp#
可用约简操作的枚举类:
SUM
、PRODUCT
、MIN
、MAX
、BAND
、BOR
、BXOR
和PREMUL_SUM
。使用
NCCL
后端时,BAND
、BOR
和BXOR
约简不可用。AVG
在跨等级求和之前将值除以世界大小。AVG
仅在使用NCCL
后端时可用,且仅适用于 NCCL 版本 2.10 或更高版本。PREMUL_SUM
在约简之前在本地将输入乘以给定的标量。PREMUL_SUM
仅在使用NCCL
后端时可用,且仅适用于 NCCL 版本 2.11 或更高版本。用户应使用torch.distributed._make_nccl_premul_sum
。此外,复杂张量不支持
MAX
、MIN
和PRODUCT
。此类的值可以通过属性访问,例如
ReduceOp.SUM
。它们用于指定约简集体操作的策略,例如reduce()
。此类不支持
__members__
属性。
分布式键值存储#
分布式包带有一个分布式键值存储,它可用于在组中的进程之间共享信息,以及在 torch.distributed.init_process_group()
中初始化分布式包(通过显式创建存储作为指定 init_method
的替代方案)。键值存储有 3 种选择:TCPStore
、FileStore
和 HashStore
。
- class torch.distributed.Store#
所有存储实现的基类,例如 PyTorch 分布式提供的 3 种存储实现:(
TCPStore
、FileStore
和HashStore
)。- add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int #
第一次对给定
key
调用 add 会在存储中创建一个与key
关联的计数器,并将其初始化为amount
。后续使用相同key
的 add 调用将计数器增加指定的amount
。如果对已通过set()
在存储中设置过的 key 调用add()
,将导致异常。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None #
根据提供的
key
和value
将键值对附加到存储中。如果存储中不存在key
,则会创建它。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # Should return "potato" >>> store.get("first_key")
- check(self: torch._C._distributed_c10d.Store, arg0: list[str]) bool #
调用此方法检查给定的
keys
列表中是否有值存储在 Store 中。在正常情况下,此调用会立即返回,但仍会遇到一些边缘死锁情况,例如在 TCPStore 被销毁后调用 check。使用一个键列表调用check()
,以检查这些键是否存储在 Store 中。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # Should return 7 >>> store.check(["first_key"])
- clone(self: torch._C._distributed_c10d.Store) torch._C._distributed_c10d.Store #
克隆 Store 并返回指向相同底层 Store 的新对象。返回的 Store 可以与原始对象并发使用。这旨在通过为每个线程克隆一个 Store 来提供一种安全的方式,以从多个线程使用 Store。
- compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes #
根据提供的
key
将键值对插入到 Store 中,并在插入前对expected_value
和desired_value
进行比较。desired_value
只会在key
的expected_value
已存在于 Store 中,或expected_value
为空字符串时才会被设置。- 参数
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool #
从 Store 中删除与
key
关联的键值对。如果键成功删除,则返回 true,否则返回 false。- 参数
key (str) – 要从 Store 中删除的键
- 返回
如果
key
已被删除,则为 True,否则为 False。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- get(self: torch._C._distributed_c10d.Store, arg0: str) bytes #
检索 Store 中与给定
key
关联的值。如果key
不存在于 Store 中,该函数将等待timeout
(在初始化 Store 时定义)后抛出异常。- 参数
key (str) – 函数将返回与此键关联的值。
- 返回
如果
key
在 Store 中,则返回与key
关联的值。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- multi_get(self: torch._C._distributed_c10d.Store, arg0: list[str]) list[bytes] #
检索
keys
中的所有值。如果keys
中的任何键不存在于 Store 中,函数将等待timeout
。- 参数
keys (List[str]) – 要从 Store 中检索的键。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # Should return [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"])
- multi_set(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: list[str]) None #
根据提供的
keys
和values
将键值对列表插入到 Store 中。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # Should return b"po" >>> store.get("first_key")
- num_keys(self: torch._C._distributed_c10d.Store) int #
返回 Store 中设置的键的数量。请注意,此数字通常会比通过
set()
和add()
添加的键的数量多一个,因为一个键用于协调所有使用 Store 的工作进程。警告
与
TCPStore
一起使用时,num_keys
返回写入底层文件的键的数量。如果 Store 被销毁并使用相同文件创建另一个 Store,则原始键将保留。- 返回
Store 中存在的键的数量。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- queue_len(self: torch._C._distributed_c10d.Store, arg0: str) int #
返回指定队列的长度。
如果队列不存在,则返回 0。
有关详细信息,请参阅 queue_push。
- 参数
key (str) – 要获取长度的队列的键。
- queue_pop(self: torch._C._distributed_c10d.Store, key: str, block: bool = True) bytes #
从指定队列中弹出一个值,如果队列为空则等待直到超时。
有关详细信息,请参阅 queue_push。
如果 block 为 False,则在队列为空时将引发 dist.QueueEmptyError。
- queue_push(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None #
将值推入指定队列。
对队列和 set/get 操作使用相同的键可能会导致意外行为。
队列支持 wait/check 操作。
队列的 wait 只会唤醒一个等待的工作进程,而不是全部。
- set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None #
根据提供的
key
和value
将键值对插入到 Store 中。如果key
已存在于 Store 中,它将用新的value
覆盖旧值。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None #
设置 Store 的默认超时时间。此超时时间用于初始化以及
wait()
和get()
等方法。- 参数
timeout (timedelta) – 要在 Store 中设置的超时时间。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
- property timeout#
获取 Store 的超时时间。
- wait(*args, **kwargs)#
重载函数。
wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None
等待
keys
中的每个键被添加到 Store。如果在timeout
(在 Store 初始化期间设置)之前未设置所有键,则wait
将抛出异常。- 参数
keys (list) – 要等待其在 Store 中设置的键列表。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None
等待
keys
中的每个键被添加到 Store,如果在提供的timeout
之前未设置键,则抛出异常。- 参数
keys (list) – 要等待其在 Store 中设置的键列表。
timeout (timedelta) – 在抛出异常之前等待键被添加的时间。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- class torch.distributed.TCPStore#
一个基于 TCP 的分布式键值存储实现。服务器存储持有数据,而客户端存储可以通过 TCP 连接到服务器存储并执行
set()
插入键值对、get()
检索键值对等操作。应始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。- 参数
host_name (str) – 服务器存储应运行的主机名或 IP 地址。
port (int) – 服务器存储应侦听传入请求的端口。
world_size (int, optional) – Store 用户的总数(客户端数量 + 服务器的 1 个)。默认值为 None(None 表示 Store 用户的数量不固定)。
is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储时为 False。默认值为 False。
timeout (timedelta, optional) – Store 在初始化以及
get()
和wait()
等方法中使用的超时时间。默认值为 timedelta(seconds=300)。wait_for_workers (bool, optional) – 是否等待所有工作进程连接到服务器存储。这仅适用于 world_size 为固定值的情况。默认值为 True。
multi_tenant (bool, optional) – 如果为 True,则当前进程中所有具有相同主机/端口的
TCPStore
实例将使用相同的底层TCPServer
。默认值为 False。master_listen_fd (int, optional) – 如果指定,底层
TCPServer
将在此文件描述符上侦听,该文件描述符必须是已绑定到port
的套接字。要绑定临时端口,建议将端口设置为 0 并读取.port
。默认值为 None(表示服务器创建一个新套接字并尝试将其绑定到port
)。use_libuv (bool, optional) – 如果为 True,则使用 libuv 作为
TCPServer
后端。默认值为 True。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- __init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: int, world_size: Optional[int] = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: Optional[int] = None, use_libuv: bool = True) None #
创建一个新的 TCPStore。
- property host#
获取 Store 侦听请求的主机名。
- property libuvBackend#
如果正在使用 libuv 后端,则返回 True。
- property port#
获取 Store 侦听请求的端口号。
- class torch.distributed.HashStore#
一个基于底层哈希表的线程安全存储实现。此存储可在同一进程内使用(例如,由其他线程使用),但不能跨进程使用。
- 示例:
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- __init__(self: torch._C._distributed_c10d.HashStore) None #
创建一个新的 HashStore。
- class torch.distributed.FileStore#
一个使用文件来存储底层键值对的存储实现。
- 参数
- 示例:
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- __init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: int = -1) None #
创建一个新的 FileStore。
- property path#
获取 FileStore 用于存储键值对的文件路径。
- class torch.distributed.PrefixStore#
一个包装器,用于将前缀添加到插入到 Store 中的每个键的 3 种键值存储(
TCPStore
、FileStore
和HashStore
)中的任意一个。- 参数
prefix (str) – 在插入到 Store 之前添加到每个键前面的前缀字符串。
store (torch.distributed.store) – 构成底层键值存储的 Store 对象。
- __init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None #
创建一个新的 PrefixStore。
- property underlying_store#
获取 PrefixStore 包装的底层 Store 对象。
集体通信分析#
请注意,您可以使用 torch.profiler
(推荐,仅在 1.8.1 后可用)或 torch.autograd.profiler
来分析此处提及的集体通信和点对点通信 API。所有开箱即用的后端(gloo
、nccl
、mpi
)都受支持,集体通信使用将在分析输出/跟踪中按预期呈现。分析代码与任何常规 torch 操作一样
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
有关分析器功能的完整概述,请参阅 分析器文档。
多 GPU 集体函数#
警告
多 GPU 函数(指每个 CPU 线程多个 GPU)已弃用。目前,PyTorch Distributed 首选的编程模型是每个线程一个设备,如本文档中的 API 所示。如果您是后端开发人员并希望支持每个线程多个设备,请联系 PyTorch Distributed 的维护者。
对象集体#
警告
对象集体有许多严重的限制。请进一步阅读以确定它们是否适合您的用例。
对象集体是一组类似于集体操作的操作,它们作用于任意 Python 对象,只要它们可以被 pickle。有各种集体模式已实现(例如广播、all_gather 等),但它们大致遵循以下模式:
将输入对象转换为 pickle(原始字节),然后将其放入字节张量中。
将此字节张量的大小通信给对等方(第一次集体操作)。
分配适当大小的张量以执行真正的集体操作。
通信对象数据(第二次集体操作)。
将原始数据转换回 Python(unpickle)。
对象集体有时具有令人惊讶的性能或内存特性,可能导致长时间运行或 OOM,因此应谨慎使用。以下是一些常见问题。
不对称的 pickle/unpickle 时间 - Pickling 对象可能很慢,具体取决于对象的数量、类型和大小。当集体操作具有扇入(例如 gather_object)时,接收端必须 unpickle 的对象数量是发送端 pickle 的对象数量的 N 倍,这可能导致其他端在其下一个集体操作中超时。
低效的张量通信 - 张量应通过常规集体 API 发送,而不是对象集体 API。可以通过对象集体 API 发送张量,但它们将被序列化和反序列化(对于非 CPU 张量,包括 CPU 同步和设备到主机复制),并且除了调试或故障排除代码之外,在几乎所有情况下,都值得重构代码以使用非对象集体。
意外的张量设备 - 如果您仍然希望通过对象集体发送张量,那么 CUDA(以及可能其他加速器)张量还有一个特殊方面。如果您 pickle 一个当前位于 cuda:3
上的张量,然后将其 unpickle,您将获得另一个位于 cuda:3
上的张量,无论您在哪个进程上,或哪个 CUDA 设备是该进程的“默认”设备。使用常规张量集体 API,“输出张量”将始终位于相同的本地设备上,这通常是您所期望的。
如果进程是第一次使用 GPU,解压张量将隐式激活 CUDA 上下文,这会浪费大量 GPU 内存。通过在将张量作为输入传递给对象集合之前将其移动到 CPU 可以避免此问题。
第三方后端#
除了内置的 GLOO/MPI/NCCL 后端外,PyTorch distributed 还通过运行时注册机制支持第三方后端。有关如何通过 C++ 扩展开发第三方后端的参考,请参阅 教程 - 自定义 C++ 和 CUDA 扩展 和 test/cpp_extensions/cpp_c10d_extension.cpp
。第三方后端的功能由其自身的实现决定。
新的后端派生自 c10d::ProcessGroup
,并在导入时通过 torch.distributed.Backend.register_backend()
注册后端名称和实例化接口。
当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group()
时,torch.distributed
包将在新后端上运行。
警告
第三方后端支持是实验性的,可能会发生变化。
启动工具#
torch.distributed
包还在 torch.distributed.launch
中提供了启动工具。此辅助工具可用于在每个节点上启动多个进程以进行分布式训练。
模块 torch.distributed.launch
。
torch.distributed.launch
是一个模块,用于在每个训练节点上启动多个分布式训练进程。
警告
此模块将弃用,转而使用 torchrun。
该实用程序可用于单节点分布式训练,其中将生成每个节点一个或多个进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练,则每个分布式进程将在单个 GPU 上运行。这可以显着提高单节点训练性能。它也可以用于多节点分布式训练,通过在每个节点上生成多个进程,从而显着提高多节点分布式训练性能。这对于具有多个支持直接 GPU 的 Infiniband 接口的系统特别有利,因为所有这些接口都可以用于聚合通信带宽。
在单节点分布式训练或多节点分布式训练两种情况下,此实用程序都将启动每个节点指定数量的进程(--nproc-per-node
)。如果用于 GPU 训练,此数字必须小于或等于当前系统上的 GPU 数量(nproc_per_node
),并且每个进程将从 GPU 0 到 GPU (nproc_per_node - 1) 操作单个 GPU。
如何使用此模块
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多节点多进程分布式训练:(例如两个节点)
节点 1:(IP:192.168.1.1,并且有一个空闲端口:1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
节点 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查找此模块提供的可选参数
python -m torch.distributed.launch --help
重要提示
1. 此实用程序和多进程分布式(单节点或多节点)GPU 训练目前仅在使用 NCCL 分布式后端时才能达到最佳性能。因此,NCCL 后端是 GPU 训练推荐使用的后端。
2. 在您的训练程序中,您必须解析命令行参数:--local-rank=LOCAL_PROCESS_RANK
,此参数将由本模块提供。如果您的训练程序使用 GPU,您应确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。这可以通过以下方式实现:
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
通过以下任一方式将设备设置为本地排名:
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
2.0.0 版中更改: 启动器会将 --local-rank=<rank>
参数传递给您的脚本。从 PyTorch 2.0.0 开始,破折号 --local-rank
优于先前使用的下划线 --local_rank
。
为了向后兼容,用户可能需要在其参数解析代码中同时处理这两种情况。这意味着在参数解析器中包含 "--local-rank"
和 "--local_rank"
。如果只提供了 "--local_rank"
,启动器将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank"
应该足够了。
3. 在您的训练程序中,您应该在开始时调用以下函数来启动分布式后端。强烈建议使用 init_method=env://
。其他初始化方法(例如 tcp://
)可能有效,但 env://
是本模块官方支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的训练程序中,您可以使用常规的分布式函数,也可以使用 torch.nn.parallel.DistributedDataParallel()
模块。如果您的训练程序使用 GPU 进行训练并且您希望使用 torch.nn.parallel.DistributedDataParallel()
模块,以下是配置方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids
参数设置为您的代码将运行的唯一 GPU 设备 ID。这通常是进程的本地排名。换句话说,device_ids
需要是 [args.local_rank]
,并且 output_device
需要是 args.local_rank
才能使用此实用程序。
5. 另一种通过环境变量 LOCAL_RANK
将 local_rank
传递给子进程的方法。当您使用 --use-env=True
启动脚本时,此行为被启用。您必须调整上面的子进程示例,将 args.local_rank
替换为 os.environ['LOCAL_RANK']
;当您指定此标志时,启动器将不传递 --local-rank
。
警告
local_rank
不是全局唯一的:它仅在机器上的每个进程中唯一。因此,不要使用它来决定是否应写入网络文件系统。请参阅 pytorch/pytorch#12042,了解如果不正确执行此操作可能出现问题的一个示例。
Spawn 工具#
多进程包 - torch.multiprocessing 包还提供了 torch.multiprocessing.spawn()
中的 spawn
函数。此辅助函数可用于生成多个进程。它的工作原理是传入您要运行的函数,并生成 N 个进程来运行它。这也可以用于多进程分布式训练。
有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现
请注意,此函数需要 Python 3.4 或更高版本。
调试 torch.distributed
应用程序#
调试分布式应用程序可能具有挑战性,因为难以理解的挂起、崩溃或跨排名的不一致行为。torch.distributed
提供了一套工具来帮助以自助方式调试训练应用程序
Python 断点#
在分布式环境中使用 Python 调试器非常方便,但因为它不能开箱即用,许多人根本不使用它。PyTorch 提供了围绕 pdb 的自定义包装器,可以简化此过程。
torch.distributed.breakpoint
使此过程变得容易。在内部,它以两种方式自定义 pdb
的断点行为,但除此之外,其行为与正常 pdb
相同。
仅在指定排名上附加调试器(由用户指定)。
通过使用
torch.distributed.barrier()
确保所有其他排名停止,该屏障将在调试排名发出continue
后释放。将子进程的标准输入重定向,使其连接到您的终端。
要使用它,只需在所有排名上发出 torch.distributed.breakpoint(rank)
,并在每种情况下使用相同的 rank
值。
受监控的屏障#
从 v1.10 开始,torch.distributed.monitored_barrier()
作为 torch.distributed.barrier()
的替代方案存在,当崩溃时,它会失败并提供有关哪个排名可能出现故障的有用信息,即并非所有排名在提供的超时时间内调用 torch.distributed.monitored_barrier()
。 torch.distributed.monitored_barrier()
使用 send
/recv
通信原语实现主机端屏障,其过程类似于确认,允许排名 0 报告哪些排名未及时确认屏障。例如,考虑以下函数,其中排名 1 未调用 torch.distributed.monitored_barrier()
(实际上这可能是由于应用程序错误或先前集体操作中的挂起):
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
在排名 0 上产生以下错误消息,允许用户确定哪个排名可能出现故障并进一步调查。
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
#
在 TORCH_CPP_LOG_LEVEL=INFO
的情况下,环境变量 TORCH_DISTRIBUTED_DEBUG
可用于触发额外的有用日志记录和集体同步检查,以确保所有排名适当同步。TORCH_DISTRIBUTED_DEBUG
可以设置为 OFF
(默认)、INFO
或 DETAIL
,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL
可能会影响应用程序性能,因此仅应在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO
将导致在使用 torch.nn.parallel.DistributedDataParallel()
训练的模型初始化时产生额外的调试日志,而 TORCH_DISTRIBUTED_DEBUG=DETAIL
将额外记录一些迭代的运行时性能统计信息。这些运行时统计信息包括前向时间、后向时间、梯度通信时间等数据。例如,给定以下应用程序:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
以下日志在初始化时呈现:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下日志在运行时呈现(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
时)
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO
增强了 torch.nn.parallel.DistributedDataParallel()
中由于模型中未使用的参数导致的崩溃日志记录。目前,如果模型的前向传递中可能存在未使用的参数,则必须在 torch.nn.parallel.DistributedDataParallel()
初始化时传入 find_unused_parameters=True
,并且从 v1.10 开始,所有模型输出都必须在损失计算中使用,因为 torch.nn.parallel.DistributedDataParallel()
不支持反向传递中未使用的参数。这些约束条件对大型模型来说尤其具有挑战性,因此当崩溃并出现错误时,torch.nn.parallel.DistributedDataParallel()
将记录所有未使用的参数的完全限定名称。例如,在上述应用程序中,如果我们修改 loss
以便计算为 loss = output[1]
,则 TwoLinLayerNet.a
在反向传递中未接收到梯度,因此导致 DDP
失败。崩溃时,用户会收到有关未使用的参数的信息,这对于大型模型来说可能难以手动查找。
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
将对用户直接或间接(如 DDP allreduce
)发出的每个集体调用触发额外的一致性和同步检查。这是通过创建一个包装进程组来完成的,该进程组包装了 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 返回的所有进程组。因此,这些 API 将返回一个包装进程组,其使用方式与常规进程组完全相同,但在将集体操作分派到底层进程组之前会执行一致性检查。目前,这些检查包括一个 torch.distributed.monitored_barrier()
,它确保所有排名完成其未完成的集体调用并报告卡住的排名。接下来,通过确保所有集体函数匹配并使用一致的张量形状来检查集体本身的一致性。如果情况并非如此,则在应用程序崩溃时会包含详细的错误报告,而不是挂起或无信息量的错误消息。例如,考虑以下函数,该函数在 torch.distributed.all_reduce()
中输入形状不匹配:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL
后端时,此类应用程序很可能导致挂起,这在非平凡的场景中可能难以定位根源。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL
并重新运行应用程序,以下错误消息将揭示根源:
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行细粒度控制,还可以使用函数 torch.distributed.set_debug_level()
、torch.distributed.set_debug_level_from_env()
和 torch.distributed.get_debug_level()
。
此外,可以将 TORCH_DISTRIBUTED_DEBUG=DETAIL
与 TORCH_SHOW_CPP_STACKTRACES=1
结合使用,以便在检测到集体不同步时记录整个调用堆栈。这些集体不同步检查将适用于所有使用 c10d
集体调用(由使用 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 创建的进程组支持)的应用程序。
日志记录#
除了通过 torch.distributed.monitored_barrier()
和 TORCH_DISTRIBUTED_DEBUG
提供显式调试支持外,torch.distributed
的底层 C++ 库还会在不同级别输出日志消息。这些消息有助于理解分布式训练作业的执行状态,并排除网络连接失败等问题。下表显示了如何通过 TORCH_CPP_LOG_LEVEL
和 TORCH_DISTRIBUTED_DEBUG
环境变量的组合来调整日志级别。
|
|
有效日志级别 |
---|---|---|
|
忽略 |
错误 |
|
忽略 |
警告 |
|
忽略 |
Info |
|
|
Debug |
|
|
Trace (即 All) |
分布式组件会引发从 RuntimeError
派生的自定义异常类型
torch.distributed.DistError
:这是所有分布式异常的基类型。torch.distributed.DistBackendError
:当发生后端特定错误时,会抛出此异常。例如,如果使用NCCL
后端,并且用户尝试使用NCCL
库不可用的 GPU。torch.distributed.DistNetworkError
:当网络库遇到错误时(例如:对等方重置连接)抛出此异常torch.distributed.DistStoreError
:当 Store 遇到错误时(例如:TCPStore 超时)抛出此异常
- class torch.distributed.DistError#
分布式库中发生错误时引发的异常
- class torch.distributed.DistBackendError#
分布式中发生后端错误时引发的异常
- class torch.distributed.DistNetworkError#
分布式中发生网络错误时引发的异常
- class torch.distributed.DistStoreError#
分布式存储中发生错误时引发的异常
如果您正在运行单节点训练,则以交互方式为脚本设置断点可能会很方便。我们提供了一种方便地为单个 rank 设置断点的方法