评价此页

使用 PyTorch 编写分布式应用程序#

创建日期:2017 年 10 月 6 日 | 最后更新:2025 年 9 月 5 日 | 最后验证:2024 年 11 月 5 日

作者: Séb Arnold

注意

editgithub 上查看和编辑本教程。

先决条件

在本简短教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,并探讨该包的一些内部机制。

设置#

PyTorch 中包含的分布式包(即 torch.distributed)使研究人员和从业者能够轻松地跨进程和机器集群并行计算。为此,它利用了消息传递语义,允许每个进程将数据传输给其他任何进程。与多进程(torch.multiprocessing)包不同,各进程可以使用不同的通信后端,且不限制必须在同一台机器上执行。

为了开始使用,我们需要能够同时运行多个进程。如果你有计算集群的访问权限,请咨询当地系统管理员或使用你偏好的协调工具(例如 pdshclustershellslurm)。在本教程中,我们将使用单台机器,并通过以下模板派生多个进程。

"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = 2
    processes = []
    if "google.colab" in sys.modules:
        print("Running in Google Colab")
        mp.get_context("spawn")
    else:
        mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上述脚本派生了两个进程,它们将各自设置分布式环境,初始化进程组(dist.init_process_group),并最终执行给定的 run 函数。

让我们看看 init_process 函数。它确保每个进程都能通过主节点使用相同的 IP 地址和端口进行协调。注意我们使用了 gloo 后端,但也有其他后端可用。(参考 5.1 节)。我们将在本教程末尾介绍 dist.init_process_group 中发生的魔法,但它本质上允许进程通过共享各自的位置信息来相互通信。

点对点通信#

Send and Recv

发送与接收#

从一个进程到另一个进程的数据传输称为点对点通信。这些是通过 sendrecv 函数或它们的“立即”对应函数 isendirecv 来实现的。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,两个进程都以零张量开始,然后进程 0 对张量增值并将其发送给进程 1,最终它们都得到 1.0。注意,进程 1 需要分配内存以存储它将要接收的数据。

还要注意 send/recv阻塞的:两个进程都会阻塞直到通信完成。另一方面,“立即”函数是非阻塞的;脚本继续执行,这些方法返回一个 Work 对象,我们可以选择对其调用 wait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

使用立即函数时,我们必须小心如何使用已发送和接收的张量。由于我们不知道数据何时会被传输给另一个进程,因此在 req.wait() 完成之前,不应修改发送的张量,也不应访问接收到的张量。换句话说:

  • dist.isend() 之后写入 tensor 将导致未定义的行为。

  • req.wait() 执行之前,从 dist.irecv() 之后读取 tensor 将导致未定义的行为。

然而,在 req.wait() 执行后,我们可以确信通信已经完成,并且存储在 tensor[0] 中的值是 1.0。

当我们想要对进程通信进行更细粒度的控制时,点对点通信非常有用。它们可用于实现复杂的算法,例如 百度 DeepSpeechFacebook 大规模实验 中所使用的算法。(参考 4.1 节

集合通信#

Scatter

散发 (Scatter)#

Gather

收集 (Gather)#

Reduce

归约 (Reduce)#

All-Reduce

全归约 (All-Reduce)#

Broadcast

广播 (Broadcast)#

All-Gather

全收集 (All-Gather)#

与点对点通信不同,集合通信允许在组 (group) 中的所有进程间进行通信模式。组是所有进程的一个子集。要创建一个组,我们可以向 dist.new_group(group) 传递一个排名 (rank) 列表。默认情况下,集合通信在所有进程上执行,也称为世界 (world)。例如,为了获得所有进程上所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group) 集合操作。

""" All-Reduce example."""
def run(rank, size):
    """ Simple collective communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我们需要组中所有张量的总和,我们使用 dist.ReduceOp.SUM 作为归约算子。通常情况下,任何可交换的数学运算都可以用作算子。PyTorch 开箱即用地提供了许多此类算子,它们都在逐元素层面上工作:

  • dist.ReduceOp.SUM,

  • dist.ReduceOp.PRODUCT,

  • dist.ReduceOp.MAX,

  • dist.ReduceOp.MIN,

  • dist.ReduceOp.BAND,

  • dist.ReduceOp.BOR,

  • dist.ReduceOp.BXOR,

  • dist.ReduceOp.PREMUL_SUM.

完整的支持算子列表请见此处

除了 dist.all_reduce(tensor, op, group) 之外,PyTorch 目前还实现了许多其他集合通信操作。以下是一些受支持的集合操作:

  • dist.broadcast(tensor, src, group): 将 tensorsrc 复制到所有其他进程。

  • dist.reduce(tensor, dst, op, group): 对每个 tensor 应用 op 并将结果存储在 dst 中。

  • dist.all_reduce(tensor, op, group): 与 reduce 相同,但结果存储在所有进程中。

  • dist.scatter(tensor, scatter_list, src, group): 将第 \(i^{\text{th}}\) 个张量 scatter_list[i] 复制到第 \(i^{\text{th}}\) 个进程。

  • dist.gather(tensor, gather_list, dst, group): 从所有进程将 tensor 复制到 dst

  • dist.all_gather(tensor_list, tensor, group): 将 tensor 从所有进程复制到所有进程上的 tensor_list 中。

  • dist.barrier(group): 阻塞 group 中的所有进程,直到每个进程都进入此函数。

  • dist.all_to_all(output_tensor_list, input_tensor_list, group): 将输入张量列表散发给组中的所有进程,并在输出列表中返回收集到的张量列表。

完整的集合通信操作列表可以在 PyTorch 分布式最新文档中找到 (链接)

分布式训练#

注意: 你可以在此 GitHub 仓库中找到本节的示例脚本。

既然我们了解了分布式模块的工作原理,让我们用它编写一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。当然,这是一个教学示例,在实际生产环境中,你应该使用上面链接的官方、经过充分测试和优化的版本。

简单来说,我们想实现一个分布式版本的随机梯度下降 (SGD)。我们的脚本将让所有进程在各自的数据批次上计算模型梯度,然后平均这些梯度。为了确保在改变进程数量时获得相似的收敛结果,我们首先必须对数据集进行分区。(你也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()  # from random import Random
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

有了上面的代码片段,我们现在只需通过以下几行代码即可对任何数据集进行分区:

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 // size
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假设我们有 2 个副本,那么每个进程将拥有 60000 / 2 = 30000 个样本的 train_set。我们还将批次大小除以副本数量,以保持总体批次大小为 128。

我们现在可以编写通常的前向-后向-优化训练代码,并添加一个函数调用来对模型梯度进行平均。(以下代码在很大程度上受到了官方 PyTorch MNIST 示例 的启发。)

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

剩下的就是实现 average_gradients(model) 函数,它简单地接收一个模型并在整个“世界”中对其梯度进行平均。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

Et voilà!我们成功实现了分布式同步 SGD,并可以在大型计算机集群上训练任何模型。

注意: 虽然最后一句话在技术上是正确的,但实现生产级的同步 SGD 还需要更多的技巧。请再次强调,请使用已经过测试和优化的实现

我们自己的 Ring-Allreduce#

作为一个额外的挑战,想象一下我们想要实现 DeepSpeech 高效的环形 (ring) allreduce。使用点对点集合通信来实现这一点相当容易。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

在上面的脚本中,allreduce(send, recv) 函数的签名与 PyTorch 中的略有不同。它接收一个 recv 张量,并将所有 send 张量的总和存储在其中。作为一个留给读者的练习,我们的版本与 DeepSpeech 中的版本还有一个区别:他们的实现将梯度张量分成了块 (chunks),从而最优地利用了通信带宽。(提示:torch.chunk

进阶主题#

我们现在准备探索 torch.distributed 的一些更高级功能。由于内容较多,本节分为两个子章节:

  1. 通信后端:学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。

  2. 初始化方法:了解如何在 dist.init_process_group() 中最好地设置初始协调阶段。

通信后端#

torch.distributed 最优雅的方面之一是它能够抽象并构建在不同的后端之上。如前所述,PyTorch 中实现了多个后端。这些后端可以使用 加速器 API 轻松选择,该 API 提供了一个与不同加速器类型交互的接口。一些最流行的后端是 Gloo、NCCL 和 MPI。它们各自具有不同的规格和权衡,具体取决于所需的用例。支持功能的比较表可以在此处找到。

Gloo 后端

到目前为止,我们已经广泛使用了 Gloo 后端。作为一个开发平台,它非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且在 Linux(自 0.2 起)和 macOS(自 1.3 起)上均可运行。它支持 CPU 上的所有点对点和集合操作,以及 GPU 上的所有集合操作。CUDA 张量的集合操作实现不如 NCCL 后端提供的那么优化。

正如你肯定注意到的,如果将 model 放在 GPU 上,我们的分布式 SGD 示例将无法工作。为了使用多个 GPU,让我们进行以下修改:

  1. 使用加速器 API device_type = torch.accelerator.current_accelerator()

  2. 使用 torch.device(f"{device_type}:{rank}")

  3. model = Net() \(\rightarrow\) model = Net().to(device)

  4. 使用 data, target = data.to(device), target.to(device)

通过这些修改,你的模型现在将跨两个 GPU 进行训练。如果你在 NVIDIA 硬件上运行,可以使用 watch nvidia-smi 监控 GPU 利用率。

MPI 后端

消息传递接口 (MPI) 是高性能计算领域的一种标准化工具。它允许进行点对点和集合通信,是 torch.distributed API 的主要灵感来源。存在多种 MPI 实现(例如 Open-MPIMVAPICH2Intel MPI),每种实现都针对不同目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上的广泛可用性和高水平优化。一些最近的实现也能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存拷贝。

不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,我们必须手动重新编译它。幸运的是,这个过程相当简单,因为在编译时,PyTorch 会自动寻找可用的 MPI 实现。以下步骤通过从源码安装 PyTorch 来安装 MPI 后端。

  1. 创建并激活你的 Anaconda 环境,按照指南安装所有先决条件,但不要运行 python setup.py install

  2. 选择并安装你偏好的 MPI 实现。注意,启用 CUDA 感知 MPI 可能需要一些额外步骤。在我们的案例中,我们将坚持使用不带 GPU 支持的 Open-MPI:conda install -c conda-forge openmpi

  3. 现在,进入你克隆的 PyTorch 仓库并执行 python setup.py install

为了测试我们新安装的后端,需要进行一些修改。

  1. if __name__ == '__main__': 下的内容替换为 init_process(0, 0, run, backend='mpi')

  2. 运行 mpirun -n 4 python myscript.py

进行这些更改的原因是 MPI 需要在派生进程之前创建自己的环境。MPI 也会派生自己的进程并执行在 初始化方法 中描述的握手,这使得 init_process_groupranksize 参数变得多余。这实际上非常强大,因为你可以将附加参数传递给 mpirun,以便为每个进程定制计算资源。(例如每个进程的核心数、手动将机器分配给特定的 rank 以及更多内容)。通过这样做,你应该能够获得与其他通信后端相同的输出。

NCCL 后端

NCCL 后端为 CUDA 张量提供了针对集合操作的优化实现。如果你在集合操作中仅使用 CUDA 张量,请考虑使用此后端以获得同类最佳的性能。NCCL 后端包含在支持 CUDA 的预构建二进制文件中。

XCCL 后端

XCCL 后端为 XPU 张量提供了集合操作的优化实现。如果你的工作负载仅使用 XPU 张量进行集合操作,此后端可提供同类最佳的性能。XCCL 后端包含在支持 XPU 的预构建二进制文件中。

初始化方法#

为了结束本教程,让我们检查我们调用的初始函数:dist.init_process_group(backend, init_method)。具体来说,我们将讨论负责每个进程之间初步协调步骤的各种初始化方法。这些方法使你能够定义如何完成此协调。

初始化方法的选择取决于你的硬件设置,某种方法可能比其他方法更适合。除了以下章节外,请参阅官方文档以获取更多信息。

环境变量

我们在整个教程中一直使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程都将能够正确连接到主节点,获取关于其他进程的信息,并最终与它们完成握手:

  • MASTER_PORT: 将托管 rank 为 0 的进程的机器上的一个空闲端口。

  • MASTER_ADDR: 将托管 rank 为 0 的进程的机器的 IP 地址。

  • WORLD_SIZE: 进程的总数,以便主节点知道要等待多少个工作进程。

  • RANK: 每个进程的 rank,以便它们知道自己是主节点还是工作进程。

共享文件系统

共享文件系统要求所有进程都能访问同一个共享文件系统,并将通过共享文件来协调它们。这意味着每个进程都将打开该文件,写入自己的信息,并等待直到所有进程都完成此操作。之后,所有进程都能获得所需的信息。为了避免竞态条件,文件系统必须支持通过 fcntl 进行锁定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

可以通过提供 rank 为 0 的进程的 IP 地址和一个可访问的端口号来实现通过 TCP 初始化。在这里,所有工作进程都将能够连接到 rank 为 0 的进程,并交换关于如何相互联系的信息。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

致谢

我要感谢 PyTorch 开发人员在实现、文档和测试方面所做的出色工作。当代码不清楚时,我总是可以依靠文档测试来找到答案。特别感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 在早期草稿中提供了深刻的评论并回答了我的问题。