多进程最佳实践#
创建于:2017年1月16日 | 最后更新于:2025年6月18日
torch.multiprocessing 是 Python 的 multiprocessing 模块的直接替代品。它支持完全相同的操作,但对其进行了扩展,以便通过 multiprocessing.Queue 发送的所有张量,其数据都将移动到共享内存中,并且仅发送一个句柄到另一个进程。
注意
当 Tensor 被发送到另一个进程时,Tensor 数据是共享的。如果 torch.Tensor.grad 不为 None,它也会被共享。在没有 torch.Tensor.grad 字段的 Tensor 发送到另一个进程后,它会创建一个标准的进程特定的 .grad Tensor,该张量不会像 Tensor 的数据那样自动跨所有进程共享。
这允许实现各种训练方法,如 Hogwild、A3C 或任何需要异步操作的其他方法。
多进程中的毒叉#
在使用 加速器 进行多进程处理时,可能会发生一个已知问题,称为“毒叉”。当加速器的运行时不安全且在进程分叉之前被初始化时,就会发生这种情况,导致子进程中出现运行时错误。
- 为了防止此类错误
避免在分叉子进程之前在主进程中初始化加速器。
使用替代的进程启动方法,例如
spawn或forkserver,这可确保对每个进程进行干净的初始化。
多进程中的 CUDA#
当使用 fork 启动方法时,CUDA 运行时具有在 多进程中的毒叉 中描述的限制;要在子进程中使用 CUDA,需要 spawn 或 forkserver 启动方法。
注意
可以通过使用 multiprocessing.get_context(...) 创建上下文,或直接使用 multiprocessing.set_start_method(...) 来设置启动方法。
与 CPU 张量不同,发送进程只要接收进程保留张量的副本,就必须保留原始张量。这在底层实现,但需要用户遵循最佳实践才能正确运行程序。例如,发送进程必须在消费者进程引用张量的时间内保持存活,并且引用计数无法在消费者进程通过致命信号异常退出时拯救你。请参阅 本节。
另请参阅:使用 nn.parallel.DistributedDataParallel 代替多进程或 nn.DataParallel
最佳实践和技巧#
避免和解决死锁#
当生成新进程时,可能会出现很多问题,死锁的最常见原因是后台线程。如果任何线程持有锁或导入模块,并且调用了 fork,那么子进程很可能处于损坏状态,并且会发生死锁或以其他方式失败。请注意,即使你没有这样做,Python 内置库也会这样做——无需进一步查看 multiprocessing。 multiprocessing.Queue 实际上是一个非常复杂的类,它生成多个线程,用于序列化、发送和接收对象,它们也可能导致上述问题。如果你发现自己处于这种情况下,请尝试使用 SimpleQueue,它不使用任何额外的线程。
我们正在尽最大努力使它对你来说易于使用,并确保这些死锁不会发生,但有些事情不在我们的控制范围之内。如果你遇到任何你无法解决的问题,请尝试在论坛上寻求帮助,我们将看看是否可以修复该问题。
重用通过队列传递的缓冲区#
请记住,每次将 Tensor 放入 multiprocessing.Queue 时,都必须将其移动到共享内存中。如果它已经共享,则无需操作,否则将产生额外的内存复制,这可能会减慢整个过程。即使你有一个进程池将数据发送到单个进程,也要让它将缓冲区发送回来——这几乎是免费的,并且可以让你避免在发送下一个批次时进行复制。
异步多进程训练(例如 Hogwild)#
使用 torch.multiprocessing,可以异步训练模型,参数可以一直共享,也可以定期同步。在第一种情况下,我们建议发送整个模型对象,而在后一种情况下,我们建议仅发送 state_dict()。
我们建议使用 multiprocessing.Queue 在进程之间传递所有类型的 PyTorch 对象。使用 fork 启动方法时,有可能继承已经位于共享内存中的张量和存储,但是它很容易出错,应该谨慎使用,并且仅供高级用户使用。队列,即使有时是一种不太优雅的解决方案,也将在所有情况下正常工作。
警告
你应该小心处理没有用 if __name__ == '__main__' 保护的全局语句。如果使用 fork 以外的其他启动方法,它们将在所有子进程中执行。
Hogwild#
可以在 示例仓库 中找到具体的 Hogwild 实现,但为了展示代码的整体结构,下面也提供了一个最小的示例
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# Construct data_loader, optimizer, etc.
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# NOTE: this is required for the ``fork`` method to work
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
多进程中的 CPU#
不适当的多进程处理可能导致 CPU 超订阅,导致不同的进程争夺 CPU 资源,从而导致效率低下。
本教程将解释什么是 CPU 超订阅以及如何避免它。
CPU 超订阅#
CPU 超订阅是一个技术术语,指的是分配给系统的 vCPU 总数超过硬件上可用的 vCPU 总数的情况。
这会导致 CPU 资源的严重争用。在这种情况下,进程之间频繁切换,增加了进程切换开销并降低了整体系统效率。
请参阅 Hogwild 实现中 CPU 超订阅的代码示例,该实现位于 示例仓库 中。
使用以下命令在 CPU 上使用 4 个进程运行训练示例时
python main.py --num-processes 4
假设机器上有 N 个 vCPU 可用,执行上述命令将生成 4 个子进程。每个子进程将为自己分配 N 个 vCPU,从而需要 4*N 个 vCPU。但是,机器仅有 N 个 vCPU 可用。因此,不同的进程将争夺资源,导致频繁的进程切换。
以下观察结果表明存在 CPU 超订阅
高 CPU 利用率:使用
htop命令,你可以观察到 CPU 利用率始终很高,通常达到或超过其最大容量。这表明对 CPU 资源的需求超过了可用的物理核心,导致争用和竞争。频繁的上下文切换和低系统效率:在 CPU 超订阅的情况下,进程争夺 CPU 时间,操作系统需要快速在不同的进程之间切换以公平地分配资源。这种频繁的上下文切换会增加开销并降低整体系统效率。
避免 CPU 超订阅#
避免 CPU 超订阅的一个好方法是适当的资源分配。确保并发运行的进程或线程数量不超过可用的 CPU 资源。
在这种情况下,一种解决方案是在子进程中指定适当数量的线程。可以通过在子进程中使用 torch.set_num_threads(int) 函数来设置每个进程的线程数来实现。
假设机器上有 N 个 vCPU,并且将生成 M 个进程,则每个进程使用的最大 num_threads 值将为 floor(N/M)。为了避免在 mnist_hogwild 示例中出现 CPU 超订阅,需要对 示例仓库 中的 train.py 文件进行以下更改。
def train(rank, args, model, device, dataset, dataloader_kwargs):
torch.manual_seed(args.seed + rank)
#### define the num threads used in current sub-processes
torch.set_num_threads(floor(N/M))
train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train_epoch(epoch, args, model, device, train_loader, optimizer)
使用 torch.set_num_threads(floor(N/M)) 设置每个进程的 num_thread。其中,您需要将 N 替换为可用的 vCPU 数量,将 M 替换为选择的进程数量。 适当的 num_thread 值会根据具体任务而有所不同。但是,作为一般指导原则,num_thread 的最大值应为 floor(N/M),以避免 CPU 超订阅。在 mnist_hogwild 训练示例中,避免 CPU 超订阅后,您可以获得 30 倍的性能提升。