评价此页

torchrun (Elastic Launch)#

Created On: May 04, 2021 | Last Updated On: Aug 26, 2021

Module torch.distributed.run

torch.distributed.run 是一个在每个训练节点上启动多个分布式训练进程的模块。

torchrun 是一个 python 控制台脚本,指向在 setup.pyentry_points 配置中声明的主模块 torch.distributed.run。它等同于调用 python -m torch.distributed.run

torchrun 可用于单节点分布式训练,在该节点上将启动每个节点的一个或多个进程。它可以用于 CPU 训练或 GPU 训练。如果用于 GPU 训练,每个分布式进程将操作于单个 GPU。这可以实现单节点训练性能的良好提升。torchrun 也可用于多节点分布式训练,通过在每个节点上启动多个进程来提高多节点分布式训练的性能。这对于具有多个支持直接 GPU 的 Infiniband 接口的系统尤其有利,因为所有这些接口都可以用于聚合通信带宽。

在单节点分布式训练或多节点分布式训练这两种情况下,torchrun 都将启动每个节点上指定数量的进程(--nproc-per-node)。如果用于 GPU 训练,此数量需要小于或等于当前系统的 GPU 数量(nproc_per_node),并且每个进程将操作于单个 GPU,从 *GPU 0 到 GPU (nproc_per_node - 1)*。

版本 2.0.0 中已更改: torchrun 将把 --local-rank=<rank> 参数传递给您的脚本。从 PyTorch 2.0.0 起,首选使用带连字符的 --local-rank 而不是之前使用的带下划线的 --local_rank

为了向后兼容,用户可能需要在其参数解析代码中同时处理这两种情况。这意味着在参数解析器中包含 "--local-rank""--local_rank"。如果只提供了 "--local_rank"torchrun 将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank" 应该足够了。

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

用法#

单节点多工作进程#

torchrun
    --standalone
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

注意

--nproc-per-node 可以是 "gpu"(为每个 GPU 启动一个进程)、"cpu"(为每个 CPU 启动一个进程)、"auto"(如果 CUDA 可用,则等同于 "gpu",否则等同于 "cpu"),或者是一个指定进程数量的整数。有关更多详细信息,请参阅 torch.distributed.run.determine_local_world_size

堆叠单节点多工作进程#

要在同一主机上运行多个(独立作业)的单节点多工作进程,我们需要确保每个实例(作业)在不同的端口上设置,以避免端口冲突(或者更糟的是,两个作业合并为一个作业)。为此,您必须使用 --rdzv-backend=c10d 运行,并通过设置 --rdzv-endpoint=localhost:$PORT_k 指定一个不同的端口。对于 --nodes=1,让 torchrun 自动选择一个空闲的随机端口通常比手动为每次运行分配不同端口更方便。

torchrun
    --rdzv-backend=c10d
    --rdzv-endpoint=localhost:0
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

容错(固定数量的工作进程,无弹性,可容忍 3 次故障)#

torchrun
    --nnodes=$NUM_NODES
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

HOST_NODE_ADDR,形式为 <host>[:<port>](例如 node1.example.com:29400),指定 C10d 协调后端应该实例化和托管的节点和端口。它可以是训练集群中的任何节点,但最好选择一个具有高带宽的节点。

注意

如果未指定端口号,HOST_NODE_ADDR 默认为 29400。

弹性(min=1max=4,可容忍最多 3 次成员资格更改或故障)#

torchrun
    --nnodes=1:4
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

HOST_NODE_ADDR,形式为 <host>[:<port>](例如 node1.example.com:29400),指定 C10d 协调后端应该实例化和托管的节点和端口。它可以是训练集群中的任何节点,但最好选择一个具有高带宽的节点。

注意

如果未指定端口号,HOST_NODE_ADDR 默认为 29400。

关于协调后端的注意事项#

对于多节点训练,您需要指定

  1. --rdzv-id:一个唯一的作业 ID(由参与该作业的所有节点共享)

  2. --rdzv-backendtorch.distributed.elastic.rendezvous.RendezvousHandler 的一个实现

  3. --rdzv-endpoint:运行协调后端的端点;通常形式为 host:port

目前,c10d(推荐)、etcd-v2etcd(旧版)协调后端都支持开箱即用。要使用 etcd-v2etcd,请设置一个启用了 v2 API 的 etcd 服务器(例如 --enable-v2)。

警告

etcd-v2etcd 协调使用 etcd API v2。您必须在 etcd 服务器上启用 v2 API。我们的测试使用了 etcd v3.4.3。

警告

对于基于 etcd 的协调,我们建议使用 etcd-v2 而不是 etcd,因为 etcd-v2 功能上等效但实现经过改进。etcd 处于维护模式,将在未来的版本中移除。

定义#

  1. Node - 一个物理实例或容器;映射到作业管理器工作的单位。

  2. Worker - 在分布式训练的上下文中,一个工作进程。

  3. WorkerGroup - 执行相同功能(例如,训练器)的一组工作进程。

  4. LocalWorkerGroup - 工作进程组中运行在同一节点上的一组工作进程。

  5. RANK - 工作进程在工作进程组内的排名。

  6. WORLD_SIZE - 工作进程组中工作进程的总数。

  7. LOCAL_RANK - 工作进程在本地工作进程组内的排名。

  8. LOCAL_WORLD_SIZE - 本地工作进程组的大小。

  9. rdzv_id - 用户定义的 ID,用于唯一标识作业的工作进程组。每个节点使用此 ID 加入特定的工作进程组。

  1. rdzv_backend - 集合后端(例如 c10d)。这通常是一个强一致性的键值存储。

  2. rdzv_endpoint - 集合后端端点;通常格式为 <host>:<port>

一个 Node 运行 LOCAL_WORLD_SIZE 个工作进程,这些工作进程构成一个 LocalWorkerGroup。作业中各节点的所有 LocalWorkerGroups 的联合构成 WorkerGroup

环境变量#

您的脚本中提供了以下环境变量

  1. LOCAL_RANK - 本地排名。

  2. RANK - 全局排名。

  3. GROUP_RANK - 工作进程组的排名。一个介于 0 和 max_nnodes 之间的数字。当每个节点运行一个工作进程组时,这就是节点的排名。

  4. ROLE_RANK - 在所有具有相同角色的工作进程中的工作进程排名。工作进程的角色在 WorkerSpec 中指定。

  5. LOCAL_WORLD_SIZE - 本地世界大小(例如,本地运行的工作进程数);等于 torchrun 上指定的 --nproc-per-node

  6. WORLD_SIZE - 世界大小(作业中工作进程的总数)。

  7. ROLE_WORLD_SIZE - 使用 WorkerSpec 中指定的相同角色启动的工作进程的总数。

  8. MASTER_ADDR - 运行排名为 0 的工作进程的宿主机的 FQDN;用于初始化 Torch Distributed 后端。

  9. MASTER_PORT - MASTER_ADDR 上可用于托管 C10d TCP 存储的端口。

  10. TORCHELASTIC_RESTART_COUNT - 到目前为止工作进程组的重启次数。

  11. TORCHELASTIC_MAX_RESTARTS - 配置的最大重启次数。

  12. TORCHELASTIC_RUN_ID - 等于集合 run_id(例如,唯一的作业 ID)。

  13. PYTHON_EXEC - 系统可执行文件覆盖。如果提供,Python 用户脚本将使用 PYTHON_EXEC 的值作为可执行文件。默认情况下使用 sys.executable

部署#

  1. (C10d 后端不需要)启动集合后端服务器并获取端点(将作为 --rdzv-endpoint 传递给 torchrun

  2. 单节点多工作进程:在宿主机上启动 torchrun 以启动代理进程,该进程创建并监视本地工作进程组。

  3. 多节点多工作进程:在参与训练的所有节点上使用相同的参数启动 torchrun

当使用作业/集群管理器时,多节点作业的入口点命令应该是 torchrun

故障模式#

  1. 工作进程故障:对于一个包含 n 个工作进程的训练作业,如果 k<=n 个工作进程失败,所有工作进程将停止并最多重启 max_restarts 次。

  2. 代理故障:代理故障会导致本地工作进程组故障。由作业管理器决定是使整个作业失败(集体语义)还是尝试替换节点。代理支持这两种行为。

  3. 节点故障:与代理故障相同。

成员变更#

  1. 节点退出(缩减规模):代理会收到节点退出的通知,所有现有工作进程被停止,形成一个新的 WorkerGroup,然后所有工作进程以新的 RANKWORLD_SIZE 启动。

  2. 节点加入(扩增规模):新节点被允许加入作业,所有现有工作进程被停止,形成一个新的 WorkerGroup,然后所有工作进程以新的 RANKWORLD_SIZE 启动。

重要提示#

  1. 此实用程序和多进程分布式(单节点或多节点)GPU 训练目前仅在使用 NCCL 分布式后端时才能达到最佳性能。因此,对于 GPU 训练,推荐使用 NCCL 后端。

  2. 此模块为您提供了初始化 Torch 进程组所需的环境变量,您无需手动传递 RANK。要在训练脚本中初始化进程组,只需运行

>>> import torch.distributed as dist
>>> dist.init_process_group(backend="gloo|nccl")
  1. 在您的训练程序中,您可以选择使用常规的分布式函数,也可以使用 torch.nn.parallel.DistributedDataParallel() 模块。如果您的训练程序使用 GPU 进行训练,并且您想使用 torch.nn.parallel.DistributedDataParallel() 模块,配置方法如下。

local_rank = int(os.environ["LOCAL_RANK"])
model = torch.nn.parallel.DistributedDataParallel(
    model, device_ids=[local_rank], output_device=local_rank
)

请确保 device_ids 参数设置为您的代码将要操作的唯一 GPU 设备 ID。这通常是进程的本地排名。换句话说,为了使用此实用程序,device_ids 需要是 [int(os.environ("LOCAL_RANK"))],而 output_device 需要是 int(os.environ("LOCAL_RANK"))

  1. 在发生故障或成员变更时,所有存活的工作进程都会立即被终止。请务必检查点保存您的进度。检查点频率应取决于您的作业对丢失工作的容忍度。

  2. 此模块仅支持同构 LOCAL_WORLD_SIZE。也就是说,假定所有节点运行相同数量的本地工作进程(每个角色)。

  3. RANK 是不稳定的。在重启之间,节点上的本地工作进程可能被分配与之前不同的排名范围。切勿硬编码关于排名稳定性或 RANKLOCAL_RANK 之间存在任何关联的假设。

  4. 在使用弹性(min_size!=max_size)时,请勿硬编码对 WORLD_SIZE 的假设,因为随着节点被允许离开和加入,世界大小可能会发生变化。

  5. 建议您的脚本具有以下结构

def main():
    load_checkpoint(checkpoint_path)
    initialize()
    train()


def train():
    for batch in iter(dataset):
        train_step(batch)

        if should_checkpoint:
            save_checkpoint(checkpoint_path)
  1. (推荐)在工作进程出错时,此工具将汇总错误的详细信息(例如,时间、排名、主机、pid、回溯等)。在每个节点上,第一个错误(按时间戳排序)被启发式地报告为“根本原因”错误。为了在错误摘要中包含回溯信息,您必须像下面的示例一样装饰您的主入口函数。如果未装饰,则摘要将不包含异常的回溯,仅包含退出码。有关 torchelastic 错误处理的详细信息,请参阅:https://pytorch.ac.cn/docs/stable/elastic/errors.html

from torch.distributed.elastic.multiprocessing.errors import record


@record
def main():
    # do train
    pass


if __name__ == "__main__":
    main()