评价此页

多进程包 - torch.multiprocessing#

创建日期:2016 年 12 月 23 日 | 最后更新日期:2025 年 6 月 8 日

torch.multiprocessing 是对原生 multiprocessing 模块的封装。

它注册了自定义的 reducers,这些 reducers 使用共享内存,在不同进程中提供对同一数据的共享视图。一旦 tensor/storage 被移动到 shared_memory(参见 share_memory_()),就可以在不进行任何复制的情况下将其发送到其他进程。

API 与原始模块 100% 兼容 - 只需将 import multiprocessing 改为 import torch.multiprocessing,就可以让所有通过队列发送或通过其他机制共享的 tensor 被移动到共享内存中。

由于 API 的相似性,我们不对该包的大部分内容进行文档记录,并建议参考原始模块非常优秀的文档。

警告

如果主进程异常退出(例如,由于收到信号),Python 的 multiprocessing 有时会无法清理其子进程。这是一个已知的注意事项,因此如果您在中断解释器后看到任何资源泄漏,这可能意味着您刚刚遇到了这种情况。

策略管理#

torch.multiprocessing.get_all_sharing_strategies()[源]#

返回当前系统支持的共享策略集合。

torch.multiprocessing.get_sharing_strategy()[源]#

返回当前 CPU tensor 的共享策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[源]#

设置 CPU tensor 的共享策略。

参数

new_strategy (str) – 所选策略的名称。应为 get_all_sharing_strategies() 返回的值之一。

共享 CUDA tensor#

仅在 Python 3 中支持在进程之间共享 CUDA tensor,使用 spawnforkserver 启动方法。

与 CPU tensor 不同,发送进程需要一直保留原始 tensor,直到接收进程保留该 tensor 的副本。引用计数在后台实现,但要求用户遵循以下最佳实践。

警告

如果消费者进程因致命信号而异常死亡,则只要发送进程仍在运行,共享 tensor 就可能被永久保留在内存中。

  1. 在消费者中尽快释放内存。

## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)
  1. 保持生产者进程运行,直到所有消费者退出。这将防止生产者进程释放仍被消费者使用的内存。

## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
  1. 不要传递接收到的 tensor。

# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()

共享策略#

本节简要概述了不同共享策略的工作原理。请注意,这仅适用于 CPU tensor - CUDA tensor 将始终使用 CUDA API,因为这是它们可以共享的唯一方式。

文件描述符 - file_descriptor#

注意

这是默认策略(macOS 和 OS X 除外,因为不支持)。

此策略将使用文件描述符作为共享内存句柄。每当一个 storage 被移动到共享内存时,从 shm_open 获取的文件描述符会与对象一起缓存,当它将被发送到其他进程时,文件描述符将被传输(例如,通过 UNIX 套接字)给它。接收方也会缓存文件描述符并 mmap 它,以获得对 storage 数据的共享视图。

请注意,如果共享的 tensor 很多,此策略将大部分时间保持大量打开的文件描述符。如果您的系统对打开文件描述符的数量有限制,并且您无法提高这些限制,则应使用 file_system 策略。

文件系统 - file_system#

此策略将使用传递给 shm_open 的文件名来标识共享内存区域。这样做的好处是不需要实现来缓存从它获取的文件描述符,但同时容易导致共享内存泄漏。文件创建后不能立即删除,因为其他进程需要访问它才能打开它们的视图。如果进程因致命错误而崩溃,或被终止且未调用 storage 析构函数,则文件将保留在系统中。这非常严重,因为它们会一直占用内存,直到系统重启或手动释放。

为了应对共享内存文件泄漏的问题,torch.multiprocessing 会启动一个名为 torch_shm_manager 的守护进程,它会与当前进程组隔离,并跟踪所有共享内存分配。一旦所有连接到它的进程退出,它会等待片刻以确保没有新的连接,然后遍历该组分配的所有共享内存文件。如果它发现其中任何一个仍然存在,它们将被解除分配。我们已经测试过这种方法,它在各种故障情况下都表现出了鲁棒性。尽管如此,如果您的系统有足够高的限制,并且 file_descriptor 是支持的策略,我们不建议切换到此策略。

生成子进程#

注意

适用于 Python >= 3.4。

这依赖于 Python multiprocessing 包中的 spawn 启动方法。

通过创建 Process 实例并调用 join 来等待它们完成,可以生成指定数量的子进程来执行某个函数。这种方法在处理单个子进程时效果很好,但在处理多个进程时会带来潜在的问题。

特别是,顺序 joining 进程意味着它们将顺序终止。如果它们不这样做,并且第一个进程不终止,那么进程的终止将不会被注意到。此外,没有原生的错误传播机制。

下面的 spawn 函数解决了这些问题,并处理了错误传播、乱序终止,并且在检测到其中一个进程出错时会主动终止进程。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[源]#

生成 nprocs 个进程,这些进程以 args 运行 fn

如果其中一个进程以非零退出状态退出,则剩余的进程将被杀死,并抛出一个异常,说明终止的原因。如果子进程中捕获到异常,则该异常将被转发,其回溯将包含在父进程中抛出的异常中。

参数
  • fn (function) –

    函数作为生成进程的入口点被调用。此函数必须定义在模块的顶层,以便可以对其进行 pickling 和 spawning。这是 multiprocessing 提出的要求。

    该函数被调用为 fn(i, *args),其中 i 是进程索引,args 是通过的参数元组。

  • args (tuple) – 传递给 fn 的参数。

  • nprocs (int) – 要生成的进程数。

  • join (bool) – 对所有进程执行阻塞 join。

  • daemon (bool) – 生成的进程的守护进程标志。如果设置为 True,将创建守护进程。

  • start_method (str) – (已弃用) 此方法将始终使用 spawn 作为启动方法。要使用不同的启动方法,请使用 start_processes()

返回

如果 joinTrue,则为 None;如果 joinFalse,则为 ProcessContext

class torch.multiprocessing.SpawnContext[源]#

当调用 spawn() 时(使用 join=False),返回此对象。

join(timeout=None, grace_period=None)[源]#

Join spawn context中的一个或多个进程。

尝试 join 此 spawn context 中的一个或多个进程。如果其中一个进程以非零退出状态退出,此函数将(可选地带有一个宽限期)终止剩余进程,并抛出一个异常,说明第一个进程退出的原因。

如果所有进程都成功 join,则返回 True;如果还有更多进程需要 join,则返回 False

参数
  • timeout (float) – 在放弃等待之前,等待这么长时间(以秒为单位)。

  • grace_period (float) – 当任何进程失败时,等待这么长时间(以秒为单位)让其他进程优雅关闭,然后才终止它们。如果它们仍然不退出,则在杀死它们之前再等待一个宽限期。