多进程包 - torch.multiprocessing#
创建时间:2016 年 12 月 23 日 | 最后更新时间:2025 年 6 月 8 日
torch.multiprocessing 是对原生 multiprocessing
模块的封装。
它注册了自定义的 reducers,这些 reducers 使用共享内存来提供不同进程中同一数据的共享视图。一旦 tensor/storage 被移动到 shared_memory(参见 share_memory_()
),就可以在不进行任何复制的情况下将其发送到其他进程。
该 API 与原始模块 100% 兼容——只需将 import multiprocessing
改为 import torch.multiprocessing
,就可以让所有通过队列发送或通过其他机制共享的 tensors,都移动到共享内存中。
由于 API 的相似性,我们不对该包的大部分内容进行文档记录,并建议参考原始模块的非常好的文档。
警告
如果主进程异常退出(例如,由于接收到信号),Python 的 multiprocessing
有时会无法清理其子进程。这是一个已知的注意事项,因此如果您在中断解释器后看到任何资源泄漏,则可能意味着您遇到了这种情况。
策略管理#
- torch.multiprocessing.set_sharing_strategy(new_strategy)[source]#
设置共享 CPU tensors 的策略。
- 参数
new_strategy (str) – 所选策略的名称。应为
get_all_sharing_strategies()
返回的值之一。
共享 CUDA tensors#
CUDA tensors 在进程间共享仅在 Python 3 中支持,使用 spawn
或 forkserver
启动方法。
与 CPU tensors 不同,发送进程需要保留原始 tensor,直到接收进程保留了 tensor 的副本。引用计数是在底层实现的,但要求用户遵循以下最佳实践。
警告
如果消费者进程因致命信号而异常死亡,则只要发送进程仍在运行,共享 tensor 就可能被永久保留在内存中。
在消费者中尽快释放内存。
## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)
保持 producer 进程运行,直到所有 consumer 退出。这将防止 producer 进程释放仍被 consumer 使用的内存。
## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
不要传递接收到的 tensors。
# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()
共享策略#
本节简要概述了不同共享策略的工作原理。请注意,这仅适用于 CPU tensors - CUDA tensors 将始终使用 CUDA API,因为这是它们可以共享的唯一方式。
文件描述符 - file_descriptor
#
注意
这是默认策略(macOS 和 OS X 除外,因为不支持)。
此策略将使用文件描述符作为共享内存句柄。每当一个 storage 被移动到共享内存时,从 shm_open
获取的文件描述符将与对象一起缓存,当它要被发送到其他进程时,该文件描述符将被传输(例如,通过 UNIX 套接字)给它们。接收者也将缓存文件描述符并对其进行 mmap
,以获得存储数据上的共享视图。
请注意,如果共享的 tensor 很多,此策略将大部分时间打开大量文件描述符。如果您的系统对打开的文件描述符数量有限制,且您无法提高这些限制,则应使用 file_system
策略。
文件系统 - file_system
#
此策略将使用传递给 shm_open
的文件名来标识共享内存区域。这样做的好处是无需实现缓存从 shm_open
获取的文件描述符,但同时容易导致共享内存泄漏。文件创建后不能立即删除,因为其他进程需要访问它来打开它们的视图。如果进程意外崩溃或被杀死,并且不调用 storage 析构函数,文件将保留在系统中。这非常严重,因为它们会持续占用内存,直到系统重启或手动释放它们。
为了解决共享内存文件泄漏的问题,torch.multiprocessing
将会启动一个名为 torch_shm_manager
的守护进程,它会与当前进程组隔离,并跟踪所有共享内存的分配。一旦所有连接到它的进程退出,它会等待片刻以确保没有新的连接,然后遍历该组分配的所有共享内存文件。如果发现其中任何一个仍然存在,它们将被解除分配。我们已经测试过此方法,并且它被证明对各种故障都具有鲁棒性。尽管如此,如果您的系统有足够高的限制,并且 file_descriptor
是支持的策略,我们不建议切换到此策略。
启动子进程#
注意
适用于 Python >= 3.4。
这依赖于 Python multiprocessing
包中的 spawn
启动方法。
通过创建 Process
实例并调用 join
来等待它们完成,可以启动多个子进程来执行某些函数。当处理单个子进程时,这种方法工作得很好,但在处理多个进程时会出现潜在问题。
具体来说,顺序 join 进程意味着它们将顺序终止。如果它们不这样做,第一个进程不终止,那么进程的终止将不被察觉。此外,没有原生的机制来传播错误。
下面的 spawn
函数解决了这些问题,并负责错误传播、乱序终止,以及在检测到一个进程出错时主动终止其他进程。
- torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[source]#
启动
nprocs
个进程来运行fn
和args
。如果其中一个进程以非零退出状态退出,则其余进程将被杀死,并引发一个异常,其中包含导致终止的原因。如果子进程中捕获到异常,则该异常将被转发,并且其 traceback 将包含在父进程中引发的异常中。
- 参数
fn (function) –
函数被调用为已启动进程的入口点。此函数必须定义在模块的顶层,以便能够进行 pickling 和启动。这是 multiprocessing 强制的要求。
该函数调用方式为
fn(i, *args)
,其中i
是进程索引,args
是传递的参数元组。args (tuple) – 传递给
fn
的参数。nprocs (int) – 要启动的进程数。
join (bool) – 对所有进程执行阻塞 join。
daemon (bool) – 已启动进程的 daemon 标志。如果设置为 True,将创建守护进程。
start_method (str) – (已弃用) 此方法将始终使用
spawn
作为启动方法。要使用不同的启动方法,请使用start_processes()
。
- 返回
如果
join
为True
,则为 None,如果join
为False
,则为ProcessContext
- class torch.multiprocessing.SpawnContext[source]#
当使用
join=False
调用spawn()
时返回。