使用 PyTorch 编写分布式应用程序#
创建于: 2017 年 10 月 06 日 | 最后更新: 2025 年 09 月 05 日 | 最后验证: 2024 年 11 月 05 日
作者: Séb Arnold
注意
在 GitHub 上查看和编辑此教程。
先决条件
在本简短教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,以及深入了解该包的一些内部机制。
设置#
PyTorch 中包含的分布式包(即 torch.distributed)使研究人员和从业者能够轻松地跨进程和机器集群并行化计算。为此,它利用消息传递语义,允许每个进程将数据传输到其他任何进程。与多进程 (torch.multiprocessing) 包不同,进程可以使用不同的通信后端,并且不限于在同一台机器上执行。
要开始,我们需要能够同时运行多个进程。如果您有计算集群的访问权限,请咨询您的本地系统管理员或使用您喜欢的协调工具(例如 pdsh、clustershell 或 slurm)。本教程将使用单机,并通过以下模板生成多个进程。
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上面的脚本会生成两个进程,每个进程都会设置分布式环境,初始化进程组 (dist.init_process_group),最后执行给定的 run 函数。
让我们看看 init_process 函数。它确保每个进程都能通过一个主节点进行协调,使用相同的 IP 地址和端口。请注意,我们使用了 gloo 后端,但还有其他后端可用。(参见 第 5.1 节)我们将在本教程的最后介绍 dist.init_process_group 中发生的“魔法”,但它本质上允许进程通过共享它们的位置来相互通信。
点对点通信#
发送和接收#
从一个进程到另一个进程的数据传输称为点对点通信。这些通信通过 send 和 recv 函数或它们的即时对应函数 isend 和 irecv 来实现。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的示例中,两个进程都以零张量开始,然后进程 0 递增张量并将其发送到进程 1,以便它们都得到 1.0。请注意,进程 1 需要分配内存来存储它将接收的数据。
另外请注意,send/recv 是阻塞的:两个进程都会阻塞直到通信完成。另一方面,即时函数是非阻塞的;脚本继续执行,并且这些方法在调用后返回一个 Work 对象,我们可以选择对其调用 wait()。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
在使用即时函数时,我们必须小心如何使用发送和接收的张量。由于我们不知道数据何时会传输到另一个进程,因此在 req.wait() 完成之前,我们不应修改发送的张量,也不应访问接收的张量。换句话说,
在
dist.isend()之后向tensor写入将导致未定义行为。在
req.wait()执行之前,在dist.irecv()之后从tensor读取将导致未定义行为。
但是,在 req.wait() 执行之后,我们可以保证通信已发生,并且 tensor[0] 中存储的值是 1.0。
点对点通信在我们需要更精细地控制进程通信时很有用。它们可用于实现复杂的算法,例如 百度 DeepSpeech 或 Facebook 的大规模实验中使用的算法。(参见 第 4.1 节)
集体通信#
与点对点通信相反,集体通信允许在**组**中的所有进程之间进行通信。组是我们所有进程的子集。要创建组,我们可以将一个排名列表传递给 dist.new_group(group)。默认情况下,集体通信在所有进程上执行,也称为**世界**。例如,为了获得组中所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group) 集体。
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们想要组中所有张量的总和,因此我们将 dist.ReduceOp.SUM 用作归约运算符。总的来说,任何可交换的数学运算都可以用作运算符。PyTorch 开箱即用地提供了许多此类运算符,它们都可以在元素级别上工作。
dist.ReduceOp.SUM,dist.ReduceOp.PRODUCT,dist.ReduceOp.MAX,dist.ReduceOp.MIN,dist.ReduceOp.BAND,dist.ReduceOp.BOR,dist.ReduceOp.BXOR,dist.ReduceOp.PREMUL_SUM.
支持的运算符的完整列表在此处:这里。
除了 dist.all_reduce(tensor, op, group) 之外,PyTorch 目前还实现了许多其他集体通信。以下是一些受支持的集体通信。
dist.broadcast(tensor, src, group):将tensor从src复制到所有其他进程。dist.reduce(tensor, dst, op, group):将op应用于每个tensor,并将结果存储在dst中。dist.all_reduce(tensor, op, group):与 reduce 相同,但结果存储在所有进程中。dist.scatter(tensor, scatter_list, src, group):将 \(i^{\text{th}}\) 个张量scatter_list[i]复制到 \(i^{\text{th}}\) 个进程。dist.gather(tensor, gather_list, dst, group):将来自所有进程的tensor复制到dst。dist.all_gather(tensor_list, tensor, group):将所有进程的tensor复制到所有进程的tensor_list中。dist.barrier(group):阻塞 group 中的所有进程,直到每个进程都进入此函数。dist.all_to_all(output_tensor_list, input_tensor_list, group):将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。
支持的集体通信的完整列表可以在 PyTorch Distributed 的最新文档中找到((链接))。
分布式训练#
注意:您可以在 此 GitHub 存储库中找到本节的示例脚本。
现在我们已经了解了分布式模块的工作原理,让我们用它来编写一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。当然,这将是一个教学示例,在实际应用中,您应该使用上面链接的官方、经过充分测试和优化的高版本。
很简单,我们想实现一个分布式随机梯度下降版本。我们的脚本将允许所有进程计算其模型在数据批次上的梯度,然后对它们的梯度进行平均。为了确保在更改进程数量时获得相似的收敛结果,我们首先需要划分我们的数据集。(您也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
有了上面的代码片段,我们就可以使用以下几行简单地划分任何数据集了。
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有 2 个副本,那么每个进程将拥有 60000 / 2 = 30000 个样本的 train_set。我们还将副本数量除以批次大小,以保持 128 的总体批次大小。
我们现在可以编写标准的向前-向后-优化训练代码,并添加一个函数调用来平均我们模型的梯度。(以下内容在很大程度上受到了官方 PyTorch MNIST 示例的启发。)
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
现在还剩下实现 average_gradients(model) 函数,它只是接收一个模型并对其在整个世界中的梯度进行平均。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
搞定!我们成功实现了分布式同步 SGD,并且可以在大型计算机集群上训练任何模型。
注意:虽然最后一句话是技术上正确的,但要实现生产级别的同步 SGD 实现,还需要更多技巧。再次强调,请使用经过测试和优化的。
我们自己的环形 Allreduce#
作为额外的挑战,假设我们想实现 DeepSpeech 的高效环形 allreduce。使用点对点集体通信来实现这一点相当容易。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上面的脚本中,allreduce(send, recv) 函数的签名与 PyTorch 中的略有不同。它接受一个 recv 张量,并将所有 send 张量的总和存储在该张量中。留给读者作为练习,我们的版本与 DeepSpeech 中的版本仍有一个区别:它们的实现将梯度张量分成块,以便最优地利用通信带宽。(提示:torch.chunk)
高级主题#
现在我们准备探索 torch.distributed 的一些更高级的功能。由于内容很多,本节分为两个子节:
通信后端:我们将了解如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。
初始化方法:我们将了解如何最好地设置
dist.init_process_group()中的初始协调阶段。
通信后端#
torch.distributed 最优雅的方面之一是它抽象和构建在不同后端之上的能力。如前所述,PyTorch 中实现了多个后端。这些后端可以使用加速器 API轻松选择,该 API 提供了与不同加速器类型配合使用的接口。一些最流行的后端是 Gloo、NCCL 和 MPI。它们各自具有不同的规范和权衡,具体取决于所需的用例。支持函数的比较表可以在这里找到。
Gloo 后端
到目前为止,我们已经广泛使用了 Gloo 后端。它作为开发平台非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且可以在 Linux(自 0.2 起)和 macOS(自 1.3 起)上运行。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。CUDA 张量的集体操作的实现不如 NCCL 后端提供的那么优化。
您肯定已经注意到,如果我们把 model 放到 GPU 上,我们的分布式 SGD 示例将无法工作。为了使用多个 GPU,让我们也进行以下修改:
使用加速器 API
device_type = torch.accelerator.current_accelerator()使用
torch.device(f"{device_type}:{rank}")model = Net()\(\rightarrow\)model = Net().to(device)使用
data, target = data.to(device), target.to(device)
通过这些修改,您的模型现在将在两个 GPU 上进行训练。如果您在 NVIDIA 硬件上运行,可以使用 watch nvidia-smi 来监控 GPU 利用率。
MPI 后端
消息传递接口 (MPI) 是高性能计算领域的一种标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed API 的主要灵感来源。MPI 有多种实现(例如 Open-MPI、MVAPICH2、Intel MPI),每种实现都针对不同的目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上的广泛可用性和高水平优化。 一些 最近 实现还能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。
不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,我们必须手动重新编译它。幸运的是,这个过程相当简单,因为在编译时,PyTorch 会自行查找可用的 MPI 实现。以下步骤通过从源安装 PyTorch 来安装 MPI 后端(from source)。
创建并激活您的 Anaconda 环境,按照指南安装所有先决条件,但不要运行
python setup.py install。选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA 感知 MPI 可能需要一些额外的步骤。在本例中,我们将坚持使用 Open-MPI不带GPU 支持:
conda install -c conda-forge openmpi现在,转到您克隆的 PyTorch 仓库并执行
python setup.py install。
为了测试我们新安装的后端,需要进行一些修改。
将
if __name__ == '__main__':下的内容替换为init_process(0, 0, run, backend='mpi')。运行
mpirun -n 4 python myscript.py。
这些更改的原因是 MPI 需要在生成进程之前创建自己的环境。MPI 还会生成自己的进程并执行初始化方法中描述的握手,从而使 init_process_group 的 rank 和 size 参数变得多余。这实际上非常强大,因为您可以将其他参数传递给 mpirun 以定制每个进程的计算资源。(例如每个进程的核心数、手动分配机器给特定排名,以及更多信息)执行此操作后,您应该会获得与使用其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。如果您仅将 CUDA 张量用于集体操作,请考虑使用此后端以获得最佳性能。NCCL 后端包含在支持 CUDA 的预构建二进制文件中。
XCCL 后端
XCCL 后端为 XPU 张量提供了集体操作的优化实现。如果您的工作负载仅将 XPU 张量用于集体操作,此后端可提供一流的性能。XCCL 后端包含在支持 XPU 的预构建二进制文件中。
初始化方法#
为了结束本教程,让我们来审视一下我们之前调用的初始函数:dist.init_process_group(backend, init_method)。具体来说,我们将讨论负责每个进程之间初步协调步骤的各种初始化方法。这些方法使您能够定义如何完成此协调。
初始化方法的选择取决于您的硬件设置,一种方法可能比其他方法更合适。除了以下各节,请参阅官方文档以获取更多信息。
环境变量
在本教程中,我们一直使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程都将能够正确连接到主节点,获取有关其他进程的信息,并最终与它们进行握手。
MASTER_PORT:将在主机节点 0 排名进程的机器上使用的空闲端口。MASTER_ADDR:将主机节点 0 排名进程的机器的 IP 地址。WORLD_SIZE:总进程数,以便主节点知道需要等待多少工作进程。RANK:每个进程的排名,以便它们知道自己是主节点还是工作节点。
共享文件系统
共享文件系统要求所有进程能够访问共享文件系统,并将通过共享文件进行协调。这意味着每个进程都会打开文件,写入其信息,并等待直到所有进程都这样做。之后,所有必需的信息都将可供所有进程使用。为了避免竞争条件,文件系统必须支持通过 fcntl 进行锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
可以通过提供排名为 0 的进程的 IP 地址和一个可达的端口号来实现通过 TCP 进行初始化。在这里,所有工作进程都将能够连接到排名为 0 的进程,并交换有关如何相互访问的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
致谢
我要感谢 PyTorch 开发者在实现、文档和测试方面所做的出色工作。当代码不清晰时,我总能依靠文档或测试来找到答案。特别是,我要感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 为早期草稿提供了富有洞察力的评论和解答。





