Multiprocessing#
创建于: 2021年5月4日 | 最后更新于: 2024年2月29日
用于启动和管理n
个副本的worker子进程的库,这些子进程可以通过函数或二进制文件指定。
对于函数,它使用torch.multiprocessing
(因此也使用python的multiprocessing
)来生成/fork worker进程。对于二进制文件,它使用python的subprocessing.Popen
来创建worker进程。
用法1:将两个训练器作为函数启动
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
pass # train
# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
name="trainer",
entrypoint=trainer,
args={0: (1, 2, 3), 1: (4, 5, 6)},
envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
log_dir="/tmp/foobar",
redirects=Std.ALL, # write all worker stdout/stderr to a log file
tee={0: Std.ERR}, # tee only local rank 0's stderr to console
)
# waits for all copies of trainer to finish
ctx.wait()
用法2:将2个echo worker作为二进制文件启动
# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
name="echo"
entrypoint="echo",
log_dir="/tmp/foobar",
args={0: "hello", 1: "world"},
redirects={1: Std.OUT},
)
与torch.multiprocessing
类似,函数start_processes()
的返回值是一个进程上下文(api.PContext
)。如果启动了一个函数,则返回一个api.MultiprocessContext
,如果启动了一个二进制文件,则返回一个api.SubprocessContext
。两者都是父类api.PContext
类的具体实现。
启动多个Worker#
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[source]#
使用提供的选项启动
entrypoint
进程的n
个副本。entrypoint
是Callable
(函数)或str
(二进制文件)。副本的数量由args
和envs
参数的条目数量决定,这些参数需要有相同的键集。args
和env
参数是传递给入口点的参数和环境变量,按副本索引(本地秩)映射。所有本地秩都必须被计算在内。也就是说,键集应该是{0,1,...,(nprocs-1)}
。注意
当
entrypoint
是二进制文件(str
)时,args
只能是字符串。如果提供了任何其他类型,则会将其转换为字符串表示形式(例如str(arg1)
)。此外,只有当主函数被注释为torch.distributed.elastic.multiprocessing.errors.record
时,二进制文件失败才会写入error.json
错误文件。对于函数启动,这是默认完成的,无需手动使用@record
注解。redirects
和tee
是位掩码,用于指定要重定向到log_dir
中的日志文件的标准流(s)。有效掩码值在Std
中定义。要仅重定向/tee某些本地秩,请将redirects
作为字典传递,其中键是本地秩,用于指定重定向行为。任何缺失的本地秩将默认为Std.NONE
。tee
的行为类似于Unix的“tee”命令,它会重定向+打印到控制台。为了避免worker的stdout/stderr打印到控制台,请使用redirects
参数。对于每个进程,
log_dir
将包含:{local_rank}/error.json
:如果进程失败,则包含错误信息的文件的路径。{local_rank}/stdout.json
:如果redirect & STDOUT == STDOUT
{local_rank}/stderr.json
:如果redirect & STDERR == STDERR
注意
预期
log_dir
存在、为空并且是一个目录。示例
log_dir = "/tmp/test" # ok; two copies of foo: foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # invalid; envs missing for local rank 1 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # ok; two copies of /usr/bin/touch: touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # caution; arguments casted to string, runs: # echo "1" "2" "3" and echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir )
- 参数
name (str) – 一个人类可读的短名称,描述进程是什么(在 tee stdout/stderr 输出时用作标题)
entrypoint (Union[Callable, str]) – 可以是
Callable
(函数)或cmd
(二进制文件)args (dict[int, tuple]) – 每个副本的参数
envs (dict[int, dict[str, str]]) – 每个副本的环境变量
log_dir – 用于写入日志文件的目录
start_method (str) – 多进程启动方法(spawn、fork、forkserver),二进制文件时忽略
redirects – 要重定向到日志文件的标准流
tee – 要重定向并打印到控制台的标准流
local_ranks_filter – 要打印到控制台的哪些 rank 的日志
- 返回类型
进程上下文#
- class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source]#
用于标准化通过不同机制启动的进程集操作的基类。
名称
PContext
是有意为之,以区分torch.multiprocessing.ProcessContext
。警告
stdout 和 stderr 必须**始终**是 tee_stdout 和 tee_stderr(相应地)的超集,这是因为 tee 是通过重定向 + tail -f <stdout/stderr.log> 实现的
- class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[source]#
包含作为函数调用的工作进程的
PContext
。
- class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source]#
包含作为二进制文件调用的工作进程的
PContext
。
- class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source]#
使用
start_processes()
启动的进程成功运行后的结果。由PContext
返回。请注意以下事项:
所有字段均按本地 rank 映射
return_values
- 仅为函数(非二进制文件)填充。stdouts
- stdout.log 的路径(如果未重定向则为空字符串)stderrs
- stderr.log 的路径(如果未重定向则为空字符串)
- class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]#
默认的 LogsSpecs 实现。
log_dir 如果不存在则会被创建。
为每次尝试和 rank 生成嵌套文件夹。
- class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[source]#
对于每种日志类型,保存本地 rank id 到文件路径的映射。
- class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]#
定义每个工作进程的日志处理和重定向。
- 参数
log_dir (Optional[str]) – 将写入日志的基目录。
redirects (Union[Std, dict[int, Std]]) – 要重定向到文件的流。传递单个
Std
枚举以重定向所有工作进程,或传递以 local_rank 为键的映射以选择性重定向。tee (Union[Std, dict[int, Std]]) – 要复制到 stdout/stderr 的流。传递单个
Std
枚举以复制所有工作进程的流,或传递以 local_rank 为键的映射以选择性复制。