协调(低级 API)#
警告
由于 torchft 仍处于开发阶段,此模块中的 API 可能会发生更改。
此模块公开低级协调 API,允许您在 torchft 之上构建自己的自定义容错算法。
如果您正在寻找更完整的解决方案,请使用 torchft 中的其他模块。
这提供了对 Lighthouse 和 Manager 服务器及客户端的直接访问。
- class torchft.coordination.LighthouseClient(addr, connect_timeout)#
基类:
object
LighthouseClient 是 lighthouse 服务的 GRPC 客户端。
它用于直接与 lighthouse 服务器通信。
- 参数
addr (str) – lighthouse 服务器的 HTTP 地址。
connect_timeout (timedelta) – 连接 lighthouse 服务器的超时时间。
- heartbeat(replica_id, timeout=Ellipsis)#
向 lighthouse 发送一次心跳。
- 参数
replica_id (str) – 您注册的 replica_id。
timeout (timedelta, optional) – 每个 RPC 的截止时间。默认为 5 秒。
- quorum(replica_id, timeout, address=Ellipsis, store_address=Ellipsis, step=0, world_size=0, shrink_only=False, data=None)#
quorum 发送请求到 lighthouse 服务器以形成法定人数。
- 参数
replica_id (str) – 调用 quorum 的副本的字符串 ID。
timeout (timedelta) – quorum 的超时时间。
address (str) – 调用 quorum 的副本的地址。默认为 “”。
store_address (str) – store 的地址。默认为 “”。
step (python:int) – 调用 quorum 的副本的步数。默认为 0。
world_size (python:int) – 调用 quorum 的副本的世界大小。默认为 0。
shrink_only (bool) – quorum 是否仅用于缩小。默认为 false。
data (Optional[dict]) – 要随 quorum 传递的数据。
- 返回
成功时的当前 quorum。
- 返回类型
- class torchft.coordination.LighthouseServer(bind, min_replicas, join_timeout_ms=None, quorum_tick_ms=None, heartbeat_timeout_ms=None)#
基类:
object
LighthouseServer 是 lighthouse 服务的 GRPC 服务器。
它用于协调每个副本组的 ManagerServer。
此入口点主要用于测试和调试目的。对于大多数用例,建议使用 `torchft_ lighthouse` 命令。
- 参数
bind (str) – 服务器要绑定的 HTTP 地址。
min_replicas (python:int) – 形成 quorum 所需的最小副本数。
join_timeout_ms (python:int) – 加入 quorum 的超时时间。
quorum_tick_ms (python:int) – 检查 quorum 的间隔。
heartbeat_timeout_ms (python:int) – 心跳的超时时间。
- address()#
address 返回 lighthouse 服务器的地址。
- 返回
lighthouse 服务器的地址。
- 返回类型
str
- shutdown()#
shutdown 关闭 lighthouse 服务器。
- class torchft.coordination.ManagerClient(addr, connect_timeout)#
基类:
object
ManagerClient 是 manager 服务的 GRPC 客户端。
它由 trainer 用于与 ManagerServer 进行通信。
- 参数
addr (str) – manager 服务器的 HTTP 地址。
connect_timeout (timedelta) – 连接 manager 服务器的超时时间。
- should_commit(group_rank, step, should_commit, timeout)#
should_commit 向 manager 发送请求,以确定 trainer 是否应提交当前步。这将一直等待直到所有 rank 在指定的步完成检查,如果任何 worker 通过 `should_commit=False`,则返回 false。
- 参数
rank (python:int) – trainer 的 rank。
step (python:int) – trainer 的步数。
should_commit (bool) – trainer 是否应提交当前步。
timeout (timedelta) – 请求的超时时间。如果请求超时,将引发 TimeoutError。
- 返回
trainer 是否应提交当前步。
- 返回类型
布尔值
- class torchft.coordination.ManagerServer(replica_id, lighthouse_addr, hostname, bind, store_addr, world_size, heartbeat_interval, connect_timeout, quorum_retries)#
基类:
object
ManagerServer 是 manager 服务的 GRPC 服务器。每个副本组应该有一个 manager 服务器(通常运行在 rank 0 主机上)。副本组内的各个 rank 应该使用 ManagerClient 与 manager 服务器通信并参与 quorum 操作。
- 参数
replica_id (str) – 副本组的 ID。
lighthouse_addr (str) – lighthouse 服务器的 HTTP 地址。
hostname (str) – manager 服务器的主机名。
bind (str) – 服务器要绑定的 HTTP 地址。
store_addr (str) – store 服务器的 HTTP 地址。
world_size (python:int) – 副本组的世界大小。
heartbeat_interval (timedelta) – 发送心跳的间隔。
connect_timeout (timedelta) – 连接 lighthouse 服务器的超时时间。
quorum_retries (python:int) – 向 lighthouse 服务器发送 quorum 请求的重试次数。
- address()#
address 返回 manager 服务器的地址。
- 返回
manager 服务器的地址。
- 返回类型
str
- shutdown()#
shutdown 关闭 manager 服务器。
- class torchft.coordination.Quorum#
基类:
object
quorum 结果。
- 参数
quorum_id (python:int) – 当前 quorum 的 ID。
participants (list[QuorumMember]) – quorum 中的所有成员。
created (timedelta) – quorum 在服务器中创建的时间。
- created#
- participants#
- quorum_id#
- class torchft.coordination.QuorumMember#
基类:
object
quorum 的成员。
- 参数
replica_id (str) – 调用 quorum 的副本的字符串 ID。
address (str) – 调用 quorum 的副本的地址。
store_address (str) – store 的地址。
step (python:int) – 调用 quorum 的副本的步数。
world_size (python:int) – 调用 quorum 的副本的世界大小。
shrink_only (bool) – quorum 是否仅用于缩小。
timeout (timedelta) – quorum 的超时时间。
data (dict or None) – 要随 quorum 传递的数据。
- address#
- data#
- replica_id#
- shrink_only#
- step#
- store_address#
- world_size#