Elastic Agent#
Created On: May 04, 2021 | Last Updated On: Jun 07, 2025
Server#
Elastic Agent 是 TorchElastic 的控制平面。
它是一个启动和管理底层工作进程的进程。Agent 负责:
与分布式 PyTorch 协同工作:工作进程启动时会包含所有必要信息,以成功且轻松地调用 `torch.distributed.init_process_group()`。
容错:监控工作进程,并在检测到工作进程故障或不健康时,终止所有工作进程并重新启动所有进程。
弹性:响应成员变更,并使用新成员重新启动工作进程。
最简单的 Agent 部署在每个节点上,并与本地进程协同工作。更高级的 Agent 可以远程启动和管理工作进程。Agent 可以完全去中心化,基于其管理的工作进程做出决策。或者 Agent 可以进行协调,与其他 Agent(管理同一作业中工作进程的 Agent)通信以做出集体决策。
下面是管理本地工作进程组的 Agent 的示意图。

概念#
本节描述了与理解 `agent` 在 TorchElastic 中的作用相关的核心类和概念。
- class torch.distributed.elastic.agent.server.ElasticAgent[source]#
负责管理一个或多个工作进程的 Agent 进程。
工作进程被假定为标准的分布式 PyTorch 脚本。当工作进程由 Agent 创建时,Agent 会提供工作进程正确初始化 PyTorch 进程组所需的必要信息。
确切的部署拓扑和 Agent 与工作进程的比例取决于 Agent 的具体实现以及用户的作业放置偏好。例如,要在 GPU 上运行分布式训练作业,拥有 8 个训练器(每个 GPU 一个),您可以:
使用 8 个单 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 1 个工作进程。
使用 4 个双 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 2 个工作进程。
使用 2 个四 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 4 个工作进程。
使用 1 个八 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 8 个工作进程。
用法
group_result = agent.run() if group_result.is_failed(): # workers failed failure = group_result.failures[0] logger.exception("worker 0 failed with exit code : %s", failure.exit_code) else: return group_result.return_values[0] # return rank 0's results
- class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None, event_log_handler='null')[source]#
有关特定类型 worker 的蓝图信息。
对于给定的 role,必须只有一个 worker spec。worker spec 预期在所有节点(机器)上是同构的,即每个节点为特定 spec 运行相同数量的 worker。
- 参数
role (str) – 用户定义的 role,用于具有此 spec 的 worker
local_world_size (int) – 要运行的本地 worker 数量
args (tuple) – 传递给
entrypoint
的参数rdzv_handler (RendezvousHandler) – 处理此 worker 集合的 rdzv
max_restarts (int) – worker 的最大重试次数
monitor_interval (float) – 每
n
秒监控一次 worker 状态master_port (Optional[int]) – rank 0 上运行 c10d 存储的固定端口,如果未指定,则选择一个随机的空闲端口
master_addr (Optional[str]) – rank 0 上运行 c10d 存储的固定 master_addr,如果未指定,则选择 agent rank 0 的主机名
redirects – 将 std 流重定向到文件,通过传递映射来选择性地重定向特定本地 rank 的流
tee – 将指定的 std 流(或多个流)复制到控制台+文件,通过传递映射来选择性地为特定本地 rank 进行 tee,它优先于
redirects
设置。event_log_handler (str) – 事件日志处理程序的名称,如在 elastic/events/handlers.py 中注册的那样。
- class torch.distributed.elastic.agent.server.WorkerState(value)[source]#
一个
WorkerGroup
的状态。worker 组中的 worker 作为单元更改状态。如果 worker 组中的单个 worker 失败,则整个集合被视为失败。
UNKNOWN - agent lost track of worker group state, unrecoverable INIT - worker group object created not yet started HEALTHY - workers running and healthy UNHEALTHY - workers running and unhealthy STOPPED - workers stopped (interrupted) by the agent SUCCEEDED - workers finished running (exit 0) FAILED - workers failed to successfully finish (exit !0)
Worker 组从初始
INIT
状态开始,然后进展到HEALTHY
或UNHEALTHY
状态,最后达到终止状态SUCCEEDED
或FAILED
。Worker 组可以被代理中断并暂时置于
STOPPED
状态。处于STOPPED
状态的 worker 将在不久的将来由代理重新启动。将 worker 置于STOPPED
状态的一些例子是:Worker 组失败|检测到不健康
检测到成员变更
当对 worker 组执行的操作(启动、停止、rdzv、重试等)失败,并且导致操作部分应用于 worker 组时,状态将为
UNKNOWN
。通常发生在代理状态更改事件期间未捕获/未处理的异常。代理不应恢复处于UNKNOWN
状态的 worker 组,最好是自行终止并允许作业管理器重试该节点。
- class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]#
一个 worker 实例。
将此与
WorkerSpec
进行对比,后者代表 worker 的规范。Worker
从WorkerSpec
创建。Worker
之于WorkerSpec
就像对象之于类。ElasticAgent
的特定实现解释了 worker 的id
。对于本地代理,它可以是 worker 的pid (int)
,对于远程代理,它可以被编码为host:port (string)
。
实现#
以下是 torchelastic 提供的代理实现。
- class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]#
一个
torchelastic.agent.server.ElasticAgent
的实现,它处理主机本地 worker。此代理部署在每个主机上,并配置为启动
n
个 worker。使用 GPU 时,n
映射到主机上可用的 GPU 数量。本地代理不与部署在其他主机上的其他本地代理通信,即使 worker 可能进行跨主机通信。worker id 被解释为本地进程。代理将所有 worker 进程作为一个单元启动和停止。
传递给 worker 函数的 worker 函数和参数必须与 Python 的 multiprocessing 兼容。要将 multiprocessing 数据结构传递给 worker,您可以在指定的
start_method
的相同 multiprocessing 上下文中创建数据结构,并将其作为函数参数传递。exit_barrier_timeout
指定等待其他代理完成的时间(以秒为单位)。这充当安全网,以处理 worker 完成时间不同的情况,防止代理将提前完成的 worker 视为规模缩减事件。强烈建议用户代码确保 worker 以同步方式终止,而不是依赖 exit_barrier_timeout。如果定义了环境变量
TORCHELASTIC_ENABLE_FILE_TIMER
且其值为 1,则可以在`LocalElasticAgent`
中启用基于命名管道的监视器。可选地,另一个环境变量`TORCHELASTIC_TIMER_FILE`
可以设置为一个唯一的命名管道文件名。如果未设置`TORCHELASTIC_TIMER_FILE`
环境变量,`LocalElasticAgent`
将在内部创建一个唯一的名称,并将其设置为环境变量`TORCHELASTIC_TIMER_FILE`
,并且此环境变量将被传播到 worker 进程,以便它们可以连接到`LocalElasticAgent`
使用的同一命名管道。日志将写入指定的日志目录。每行日志默认会加上前缀
[${role_name}${local_rank}]:
(例如[trainer0]: foobar
)。可以通过将 模板字符串 作为log_line_prefix_template
参数来定制日志前缀。在运行时会替换以下宏(标识符):${role_name}, ${local_rank}, ${rank}
。例如,要使每行日志都以全局 rank 而不是本地 rank 作为前缀,请设置log_line_prefix_template = "[${rank}]:
。示例启动函数
def trainer(args) -> str: return "do train" def main(): start_method="spawn" shared_queue= multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("trainer failed") else: print(f"rank 0 return value: {results.return_values[0]}") # prints -> rank 0 return value: do train
示例启动二进制文件
def main(): spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer", args=("--trainer-args", "foobar"), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = agent.run() if not results.is_failed(): print("binary launches do not have return values")
扩展 Agent#
要扩展代理,可以直接实现 ElasticAgent
,但我们建议您扩展 SimpleElasticAgent
,它提供了大部分的脚手架,并剩下一些特定的抽象方法需要您实现。
- class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]#
一个管理特定类型 worker role 的
ElasticAgent
。一个
ElasticAgent
,它为单个WorkerSpec
(例如特定类型的 worker role)管理 worker(WorkerGroup
)。- _assign_worker_ranks(store, group_rank, group_world_size, spec)[source]#
确定 worker 进程的正确 rank。
快速路径:当所有 worker 具有相同的 role 和 world_size 时。我们计算全局 rank 为 group_rank * group_world_size + local_rank。并且 role_world_size 与 global_world_size 相同。此情况下不使用 TCP 存储。这仅在用户将环境变量 TORCH_ELASTIC_WORKER_IDENTICAL 设置为 1 时启用。
时间复杂度:每个 worker O(1),整体 O(1)
慢路径:当 worker 具有不同的 role 和 world_size 时。我们使用以下算法:
每个代理将其配置(group_rank、group_world_size、num_workers)写入通用存储。
rank 0 代理从存储中读取所有 role_info 并确定每个代理的 worker rank。
确定全局 rank:全局 rank 是通过累加它前面的所有 worker 的 local_world_size 来计算的。出于效率原因,每个 worker 都被分配一个基础全局 rank,使其 worker 位于 [base_global_rank, base_global_rank + local_world_size) 范围内。
确定 role rank:role rank 是使用第 3 点中的算法确定的,不同之处在于 rank 是相对于 role 名称计算的。
rank 0 代理将分配的 rank 写入存储。
每个代理从存储中读取分配的 rank。
时间复杂度:每个 worker O(1),rank0 O(n),整体 O(n)
- _exit_barrier()[source]#
定义一个屏障,该屏障使代理进程保持活动状态直到所有 worker 完成。
等待
exit_barrier_timeout
秒,以确保所有代理都完成其本地 worker 的执行(无论成功与否)。这充当了对用户脚本终止时间不一致情况的安全防护。
- _initialize_workers(worker_group)[source]#
为 worker_group 启动一组新的 worker。
本质上,这是一个 rendezvous,然后是
start_workers
。调用者应首先调用_stop_workers()
来停止正在运行的 worker,然后再调用此方法。乐观地将刚刚启动的 worker 组的状态设置为
HEALTHY
,并将实际状态监控委托给_monitor_workers()
方法。
- abstract _monitor_workers(worker_group)[source]#
检查
worker_group
的 worker。此函数还返回 worker 组的新状态。
- 返回类型
- _rendezvous(worker_group)[source]#
为 worker spec 指定的 worker 运行 rendezvous。
为 worker 分配新的全局 rank 和 world_size。更新 worker 组的 rendezvous 存储。
- abstract _shutdown(death_sig=Signals.SIGTERM)[source]#
清理在代理工作期间分配的任何资源。
- 参数
death_sig (Signals) – 发送给子进程的信号,默认为 SIGTERM
- class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]#
返回 worker 执行的结果。
Run 结果遵循“全有或全无”策略,即仅当此代理管理的所有本地 worker 都成功完成时,运行才算成功。
如果结果成功(例如
is_failed() = False
),则return_values
字段包含 THIS 代理管理的 worker 的输出(返回值),按其全局 rank 映射。即result.return_values[0]
是全局 rank 0 的返回值。注意
return_values
仅对于 worker 入口点是函数时才有意义。指定为二进制入口点的 worker 没有规范的返回值,并且return_values
字段无意义且可能为空。如果
is_failed()
返回True
,则failures
字段包含失败信息,同样按失败 worker 的全局 rank 映射。return_values
和failures
中的键是互斥的,也就是说,一个 worker 的最终状态只能是以下之一:成功或失败。被 agent 根据其重启策略有意终止的 worker,在return_values
或failures
中均不表示。
Agent 中的 Watchdog (看门狗)#
如果 LocalElasticAgent
进程中定义了环境变量 TORCHELASTIC_ENABLE_FILE_TIMER
且其值为 1,则可以在 LocalElasticAgent
中启用基于命名管道的 watchdog。可选地,可以设置另一个环境变量 TORCHELASTIC_TIMER_FILE
,并为其指定一个唯一的命名管道文件名。如果未设置环境变量 TORCHELASTIC_TIMER_FILE
,LocalElasticAgent
将在内部创建一个唯一的随机文件名并将其设置为环境变量 TORCHELASTIC_TIMER_FILE
,此环境变量将传播给 worker 进程,以便它们可以连接到 LocalElasticAgent
使用的同一个命名管道。
健康检查服务器#
如果 LocalElasticAgent
进程中定义了环境变量 TORCHELASTIC_HEALTH_CHECK_PORT
,则可以在 LocalElasticAgent
中启用健康检查监控服务器。为健康检查服务器增加了接口,可以通过在指定端口启动 tcp/http 服务器来扩展。此外,健康检查服务器将具有一个回调函数来检查 watchdog 是否存活。
- class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]#
健康检查监控服务器的接口,可以通过在指定端口启动 tcp/http 服务器来扩展。
- 参数