评价此页

使用 PyTorch 编写分布式应用程序#

创建于: 2017 年 10 月 06 日 | 最后更新: 2025 年 09 月 05 日 | 最后验证: 2024 年 11 月 05 日

作者: Séb Arnold

注意

editGitHub 上查看和编辑此教程。

先决条件

在本简短教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,以及深入了解该包的一些内部机制。

设置#

PyTorch 中包含的分布式包(即 torch.distributed)使研究人员和从业者能够轻松地跨进程和机器集群并行化计算。为此,它利用消息传递语义,允许每个进程将数据传输到其他任何进程。与多进程 (torch.multiprocessing) 包不同,进程可以使用不同的通信后端,并且不限于在同一台机器上执行。

要开始,我们需要能够同时运行多个进程。如果您有计算集群的访问权限,请咨询您的本地系统管理员或使用您喜欢的协调工具(例如 pdshclustershellslurm)。本教程将使用单机,并通过以下模板生成多个进程。

"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = 2
    processes = []
    if "google.colab" in sys.modules:
        print("Running in Google Colab")
        mp.get_context("spawn")
    else:
        mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上面的脚本会生成两个进程,每个进程都会设置分布式环境,初始化进程组 (dist.init_process_group),最后执行给定的 run 函数。

让我们看看 init_process 函数。它确保每个进程都能通过一个主节点进行协调,使用相同的 IP 地址和端口。请注意,我们使用了 gloo 后端,但还有其他后端可用。(参见 第 5.1 节)我们将在本教程的最后介绍 dist.init_process_group 中发生的“魔法”,但它本质上允许进程通过共享它们的位置来相互通信。

点对点通信#

Send and Recv

发送和接收#

从一个进程到另一个进程的数据传输称为点对点通信。这些通信通过 sendrecv 函数或它们的即时对应函数 isendirecv 来实现。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,两个进程都以零张量开始,然后进程 0 递增张量并将其发送到进程 1,以便它们都得到 1.0。请注意,进程 1 需要分配内存来存储它将接收的数据。

另外请注意,send/recv阻塞的:两个进程都会阻塞直到通信完成。另一方面,即时函数是非阻塞的;脚本继续执行,并且这些方法在调用后返回一个 Work 对象,我们可以选择对其调用 wait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

在使用即时函数时,我们必须小心如何使用发送和接收的张量。由于我们不知道数据何时会传输到另一个进程,因此在 req.wait() 完成之前,我们不应修改发送的张量,也不应访问接收的张量。换句话说,

  • dist.isend() 之后向 tensor 写入将导致未定义行为。

  • req.wait() 执行之前,在 dist.irecv() 之后从 tensor 读取将导致未定义行为。

但是,在 req.wait() 执行之后,我们可以保证通信已发生,并且 tensor[0] 中存储的值是 1.0。

点对点通信在我们需要更精细地控制进程通信时很有用。它们可用于实现复杂的算法,例如 百度 DeepSpeechFacebook 的大规模实验中使用的算法。(参见 第 4.1 节

集体通信#

Scatter

Scatter#

Gather

Gather#

Reduce

Reduce#

All-Reduce

All-Reduce#

Broadcast

Broadcast#

All-Gather

All-Gather#

与点对点通信相反,集体通信允许在**组**中的所有进程之间进行通信。组是我们所有进程的子集。要创建组,我们可以将一个排名列表传递给 dist.new_group(group)。默认情况下,集体通信在所有进程上执行,也称为**世界**。例如,为了获得组中所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group) 集体。

""" All-Reduce example."""
def run(rank, size):
    """ Simple collective communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我们想要组中所有张量的总和,因此我们将 dist.ReduceOp.SUM 用作归约运算符。总的来说,任何可交换的数学运算都可以用作运算符。PyTorch 开箱即用地提供了许多此类运算符,它们都可以在元素级别上工作。

  • dist.ReduceOp.SUM,

  • dist.ReduceOp.PRODUCT,

  • dist.ReduceOp.MAX,

  • dist.ReduceOp.MIN,

  • dist.ReduceOp.BAND,

  • dist.ReduceOp.BOR,

  • dist.ReduceOp.BXOR,

  • dist.ReduceOp.PREMUL_SUM.

支持的运算符的完整列表在此处:这里

除了 dist.all_reduce(tensor, op, group) 之外,PyTorch 目前还实现了许多其他集体通信。以下是一些受支持的集体通信。

  • dist.broadcast(tensor, src, group):将 tensorsrc 复制到所有其他进程。

  • dist.reduce(tensor, dst, op, group):将 op 应用于每个 tensor,并将结果存储在 dst 中。

  • dist.all_reduce(tensor, op, group):与 reduce 相同,但结果存储在所有进程中。

  • dist.scatter(tensor, scatter_list, src, group):将 \(i^{\text{th}}\) 个张量 scatter_list[i] 复制到 \(i^{\text{th}}\) 个进程。

  • dist.gather(tensor, gather_list, dst, group):将来自所有进程的 tensor 复制到 dst

  • dist.all_gather(tensor_list, tensor, group):将所有进程的 tensor 复制到所有进程的 tensor_list 中。

  • dist.barrier(group):阻塞 group 中的所有进程,直到每个进程都进入此函数。

  • dist.all_to_all(output_tensor_list, input_tensor_list, group):将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。

支持的集体通信的完整列表可以在 PyTorch Distributed 的最新文档中找到((链接))。

分布式训练#

注意:您可以在 此 GitHub 存储库中找到本节的示例脚本。

现在我们已经了解了分布式模块的工作原理,让我们用它来编写一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。当然,这将是一个教学示例,在实际应用中,您应该使用上面链接的官方、经过充分测试和优化的高版本。

很简单,我们想实现一个分布式随机梯度下降版本。我们的脚本将允许所有进程计算其模型在数据批次上的梯度,然后对它们的梯度进行平均。为了确保在更改进程数量时获得相似的收敛结果,我们首先需要划分我们的数据集。(您也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()  # from random import Random
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

有了上面的代码片段,我们就可以使用以下几行简单地划分任何数据集了。

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 // size
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假设我们有 2 个副本,那么每个进程将拥有 60000 / 2 = 30000 个样本的 train_set。我们还将副本数量除以批次大小,以保持 128 的总体批次大小。

我们现在可以编写标准的向前-向后-优化训练代码,并添加一个函数调用来平均我们模型的梯度。(以下内容在很大程度上受到了官方 PyTorch MNIST 示例的启发。)

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

现在还剩下实现 average_gradients(model) 函数,它只是接收一个模型并对其在整个世界中的梯度进行平均。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

搞定!我们成功实现了分布式同步 SGD,并且可以在大型计算机集群上训练任何模型。

注意:虽然最后一句话是技术上正确的,但要实现生产级别的同步 SGD 实现,还需要更多技巧。再次强调,请使用经过测试和优化的

我们自己的环形 Allreduce#

作为额外的挑战,假设我们想实现 DeepSpeech 的高效环形 allreduce。使用点对点集体通信来实现这一点相当容易。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
   rank = dist.get_rank()
   size = dist.get_world_size()
   send_buff = send.clone()
   recv_buff = send.clone()
   accum = send.clone()

   left = ((rank - 1) + size) % size
   right = (rank + 1) % size

   for i in range(size - 1):
       if i % 2 == 0:
           # Send send_buff
           send_req = dist.isend(send_buff, right)
           dist.recv(recv_buff, left)
           accum[:] += recv_buff[:]
       else:
           # Send recv_buff
           send_req = dist.isend(recv_buff, right)
           dist.recv(send_buff, left)
           accum[:] += send_buff[:]
       send_req.wait()
   recv[:] = accum[:]

在上面的脚本中,allreduce(send, recv) 函数的签名与 PyTorch 中的略有不同。它接受一个 recv 张量,并将所有 send 张量的总和存储在该张量中。留给读者作为练习,我们的版本与 DeepSpeech 中的版本仍有一个区别:它们的实现将梯度张量分成,以便最优地利用通信带宽。(提示:torch.chunk

高级主题#

现在我们准备探索 torch.distributed 的一些更高级的功能。由于内容很多,本节分为两个子节:

  1. 通信后端:我们将了解如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。

  2. 初始化方法:我们将了解如何最好地设置 dist.init_process_group() 中的初始协调阶段。

通信后端#

torch.distributed 最优雅的方面之一是它抽象和构建在不同后端之上的能力。如前所述,PyTorch 中实现了多个后端。这些后端可以使用加速器 API轻松选择,该 API 提供了与不同加速器类型配合使用的接口。一些最流行的后端是 Gloo、NCCL 和 MPI。它们各自具有不同的规范和权衡,具体取决于所需的用例。支持函数的比较表可以在这里找到。

Gloo 后端

到目前为止,我们已经广泛使用了 Gloo 后端。它作为开发平台非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且可以在 Linux(自 0.2 起)和 macOS(自 1.3 起)上运行。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。CUDA 张量的集体操作的实现不如 NCCL 后端提供的那么优化。

您肯定已经注意到,如果我们把 model 放到 GPU 上,我们的分布式 SGD 示例将无法工作。为了使用多个 GPU,让我们也进行以下修改:

  1. 使用加速器 API device_type = torch.accelerator.current_accelerator()

  2. 使用 torch.device(f"{device_type}:{rank}")

  3. model = Net() \(\rightarrow\) model = Net().to(device)

  4. 使用 data, target = data.to(device), target.to(device)

通过这些修改,您的模型现在将在两个 GPU 上进行训练。如果您在 NVIDIA 硬件上运行,可以使用 watch nvidia-smi 来监控 GPU 利用率。

MPI 后端

消息传递接口 (MPI) 是高性能计算领域的一种标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed API 的主要灵感来源。MPI 有多种实现(例如 Open-MPIMVAPICH2Intel MPI),每种实现都针对不同的目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上的广泛可用性和高水平优化。 一些 最近 实现还能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。

不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,我们必须手动重新编译它。幸运的是,这个过程相当简单,因为在编译时,PyTorch 会自行查找可用的 MPI 实现。以下步骤通过从源安装 PyTorch 来安装 MPI 后端(from source)。

  1. 创建并激活您的 Anaconda 环境,按照指南安装所有先决条件,但不要运行 python setup.py install

  2. 选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA 感知 MPI 可能需要一些额外的步骤。在本例中,我们将坚持使用 Open-MPI不带GPU 支持:conda install -c conda-forge openmpi

  3. 现在,转到您克隆的 PyTorch 仓库并执行 python setup.py install

为了测试我们新安装的后端,需要进行一些修改。

  1. if __name__ == '__main__': 下的内容替换为 init_process(0, 0, run, backend='mpi')

  2. 运行 mpirun -n 4 python myscript.py

这些更改的原因是 MPI 需要在生成进程之前创建自己的环境。MPI 还会生成自己的进程并执行初始化方法中描述的握手,从而使 init_process_groupranksize 参数变得多余。这实际上非常强大,因为您可以将其他参数传递给 mpirun 以定制每个进程的计算资源。(例如每个进程的核心数、手动分配机器给特定排名,以及更多信息)执行此操作后,您应该会获得与使用其他通信后端相同的熟悉输出。

NCCL 后端

NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。如果您仅将 CUDA 张量用于集体操作,请考虑使用此后端以获得最佳性能。NCCL 后端包含在支持 CUDA 的预构建二进制文件中。

XCCL 后端

XCCL 后端为 XPU 张量提供了集体操作的优化实现。如果您的工作负载仅将 XPU 张量用于集体操作,此后端可提供一流的性能。XCCL 后端包含在支持 XPU 的预构建二进制文件中。

初始化方法#

为了结束本教程,让我们来审视一下我们之前调用的初始函数:dist.init_process_group(backend, init_method)。具体来说,我们将讨论负责每个进程之间初步协调步骤的各种初始化方法。这些方法使您能够定义如何完成此协调。

初始化方法的选择取决于您的硬件设置,一种方法可能比其他方法更合适。除了以下各节,请参阅官方文档以获取更多信息。

环境变量

在本教程中,我们一直使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程都将能够正确连接到主节点,获取有关其他进程的信息,并最终与它们进行握手。

  • MASTER_PORT:将在主机节点 0 排名进程的机器上使用的空闲端口。

  • MASTER_ADDR:将主机节点 0 排名进程的机器的 IP 地址。

  • WORLD_SIZE:总进程数,以便主节点知道需要等待多少工作进程。

  • RANK:每个进程的排名,以便它们知道自己是主节点还是工作节点。

共享文件系统

共享文件系统要求所有进程能够访问共享文件系统,并将通过共享文件进行协调。这意味着每个进程都会打开文件,写入其信息,并等待直到所有进程都这样做。之后,所有必需的信息都将可供所有进程使用。为了避免竞争条件,文件系统必须支持通过 fcntl 进行锁定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

可以通过提供排名为 0 的进程的 IP 地址和一个可达的端口号来实现通过 TCP 进行初始化。在这里,所有工作进程都将能够连接到排名为 0 的进程,并交换有关如何相互访问的信息。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

致谢

我要感谢 PyTorch 开发者在实现、文档和测试方面所做的出色工作。当代码不清晰时,我总能依靠文档测试来找到答案。特别是,我要感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 为早期草稿提供了富有洞察力的评论和解答。