评价此页

Rendezvous#

Created On: May 04, 2021 | Last Updated On: May 22, 2024

In the context of Torch Distributed Elastic we use the term rendezvous to refer to a particular functionality that combines a distributed synchronization primitive with peer discovery.

It is used by Torch Distributed Elastic to gather participants of a training job (i.e. nodes) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.

Torch Distributed Elastic rendezvous provides the following critical functionalities

Barrier:

Nodes performing rendezvous will all block until the rendezvous is considered complete - this happens when at least min total number of nodes have joined the rendezvous barrier (for the same job). This also implies the barrier is not necessarily of fixed size.

There’s an additional small waiting time after reaching min number of nodes - this is used to ensure the rendezvous is not completed “too quickly” (which could potentially exclude additional nodes attempting to join at approximately the same time).

If max number of nodes is gathered at the barrier, the rendezvous is completed immediately.

There’s also an overall timeout which causes the rendezvous to fail if min number of nodes is never reached - this is meant to be a simple fail-safe to help release partially allocated job resources, in case there’s a problem with the resource manager, and is meant to be interpreted as non-retryable.

Exclusivity:

A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of nodes exists at any given time (for a given job). In other words, new nodes (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.

Torch Distributed Elastic rendezvous ensures that if a group of nodes has already completed a rendezvous (and hence might already be training), then additional “late” nodes attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.

Consistency:

When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.

Note that ranks are not stable, in the sense that the same node can be assigned a different rank in the next (re-)rendezvous.

Fault-tolerance:

Torch Distributed Elastic rendezvous is designed to tolerate node failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy nodes will happen automatically.

A node can also fail after it has completed (or has been observed by other nodes to have completed) the rendezvous - this scenario will be handled by the Torch Distributed Elastic train_loop instead (where it will also trigger a re-rendezvous).

Shared key-value store:

When the rendezvous is completed, a shared key-value store is created and returned. This store implements a torch.distributed.Store API (see distributed communication docs).

This store is only shared by the members of the completed rendezvous. It is intended to be used by Torch Distributed Elastic to exchange information necessary to initialize job control and data-planes.

Waiting workers and rendezvous closing:

Torch Distributed Elastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process

  1. Querying how many workers arrived late at the barrier, who can participate in next rendezvous.

  2. Setting the rendezvous closed to signal all nodes not to participate in next rendezvous.

DynamicRendezvousHandler:

Torch Distributed Elastic comes with the DynamicRendezvousHandler class that implements the rendezvous mechanism described above. It is a backend- agnostic type that expects a particular RendezvousBackend instance to be specified during construction.

Torch distributed users can either implement their own backend type or use one of the following implementations that come with PyTorch

  • C10dRendezvousBackend: Uses a C10d store (by default TCPStore) as the rendezvous backend. The main advantage of using a C10d store is that it requires no 3rd-party dependency (such as etcd) to establish a rendezvous.

  • EtcdRendezvousBackend: Supersedes the legacy EtcdRendezvousHandler class. Passing an EtcdRendezvousBackend instance to DynamicRendezvousHandler is functionally equivalent to instantiating an EtcdRendezvousHandler.

    store = TCPStore("localhost")
    
    backend = C10dRendezvousBackend(store, "my_run_id")
    
    rdzv_handler = DynamicRendezvousHandler.from_backend(
        run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4
    )
    

Below is a state diagram describing how rendezvous works.

../_images/etcd_rdzv_diagram.png

Registry#

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]#

Hold the parameters to construct a RendezvousHandler.

参数
  • backend (str) – The name of the backend to use to handle the rendezvous.

  • endpoint (str) – The endpoint of the rendezvous, usually in form <hostname>[:<port>].

  • run_id (str) – The id of the rendezvous.

  • min_nodes (int) – The minimum number of nodes to admit to the rendezvous.

  • max_nodes (int) – The maximum number of nodes to admit to the rendezvous.

  • local_addr (Optional[str]) – The address of the local node.

  • **kwargs – Additional parameters for the specified backend.

get(key, default=None)[source]#

Return the value for key if key exists, else default.

返回类型

任何

get_as_bool(key, default=None)[source]#

Return the value for key as a bool.

返回类型

Optional[bool]

get_as_int(key, default=None)[source]#

Return the value for key as an int.

返回类型

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]#

Represent a registry of RendezvousHandler backends.

Handler#

class torch.distributed.elastic.rendezvous.RendezvousHandler[source]#

Main rendezvous interface.

注意

Distributed Torch users normally do not need to implement their own RendezvousHandler. An implementation based on C10d Store is already provided, and is recommended for most users.

abstract get_backend()[source]#

Return the name of the rendezvous backend.

返回类型

str

abstract get_run_id()[source]#

Return the run id of the rendezvous.

The run id is a user-defined id that uniquely identifies an instance of a distributed application. It typically maps to a job id and is used to allow nodes to join the correct distributed application.

返回类型

str

abstract is_closed()[source]#

检查集合点是否已关闭。

已关闭的集合点意味着在同一作业内未来所有重新集合的尝试都将失败。

is_closed()set_closed() 具有最终传播的语义,不应用于同步。意图是,如果至少有一个节点决定作业已完成,它将关闭集合点,其他节点将很快观察到这一点并也停止运行。

返回类型

布尔值

abstract next_rendezvous()[source]#

进入集合点屏障的主入口。

阻塞直到集合点完成,并且当前进程包含在形成的 worker 组中,或者发生超时,或者集合点被标记为已关闭。

返回

一个 RendezvousInfo 的实例。

引发
返回类型

RendezvousInfo

abstract num_nodes_waiting()[source]#

返回晚于集合点屏障到达的节点数量,因此未包含在当前 worker 组中。

调用者应定期调用此方法以检查是否有新节点正在等待加入作业,如果有,则通过调用 next_rendezvous() (重新集合) 来接纳它们。

返回类型

int

abstract set_closed()[source]#

将集合点标记为已关闭。

abstract shutdown()[source]#

关闭为集合点打开的所有资源。

示例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回类型

布尔值

property use_agent_store: bool#

指示由 next_rendezvous() 返回的 store 引用可以与用户应用程序共享,并在应用程序生命周期内可用。

集合点处理程序实现将共享 store 详细信息作为 RendezvousStoreInfo 的实例。应用程序通常使用 MASTER_ADDR/MASTER_PORT 环境变量来查找 store。

Dataclasses

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]#

保存集合点的信息。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[source]#

用于引导 trainer 分布式通信的 store 地址和端口

static build(rank, store)[source]#

工厂方法,在 rank0 主机上查找未使用的端口,并获取所有 rank 的 addr/port 信息。

如果 master_addr/master_port 已知(当共享现有 tcp store 服务器时有用),则使用构造函数。

参数
  • rank (int) – 当前节点的 rank

  • store (Store) – 用于集合点的 store

  • local_addr (Optional[str]) – 当前节点的地址,如果未提供,将从主机名解析。

  • server_port (Optional[int]) – TCPStore 服务器的端口,当 TCPStore 被共享时。

返回类型

RendezvousStoreInfo

Exceptions

class torch.distributed.elastic.rendezvous.api.RendezvousError[source]#

代表集合点错误的基类。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[source]#

当集合点关闭时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]#

当集合点未按时完成时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source]#

当与集合点后端的连接失败时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[source]#

当集合点状态损坏时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[source]#

当节点未包含在集合点中并正常退出时引发。

异常是退出堆栈的机制,但并不表示失败。

Implementations

Dynamic Rendezvous

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]#

根据指定的参数创建一个新的 DynamicRendezvousHandler

参数
  • store (Store) – 作为集合点一部分返回的 C10d store。

  • backend (RendezvousBackend) – 用于保存集合点状态的后端。

返回类型

DynamicRendezvousHandler

参数

描述

join_timeout

集合点预计完成的总时间(秒)。默认为 600 秒。

last_call_timeout

在集合点达到最小节点数后完成集合点的额外等待时间(秒)。默认为 30 秒。

close_timeout

集合点在调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 后预计关闭的时间(秒)。默认为 30 秒。

heartbeat

预计完成的保持活跃心跳的时间(秒)

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]#

表示设置节点集合点的处理程序。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source]#

创建一个新的 DynamicRendezvousHandler

参数
  • run_id (str) – 集合点的运行 ID。

  • store (Store) – 作为集合点一部分返回的 C10d store。

  • backend (RendezvousBackend) – 用于保存集合点状态的后端。

  • min_nodes (int) – The minimum number of nodes to admit to the rendezvous.

  • max_nodes (int) – The maximum number of nodes to admit to the rendezvous.

  • local_addr (Optional[str]) – 本地节点地址。

  • timeout (Optional[RendezvousTimeout]) – 集合点的超时配置。

  • keep_alive_interval (int) – 节点在发送心跳以保持其在集合点中存活之前等待的时间。

  • keep_alive_max_attempt (int) – 在节点被视为死亡之前,失败心跳的最大尝试次数。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]#

表示保存集合点状态的后端。

abstract get_state()[source]#

获取集合点状态。

返回

编码的集合点状态及其锁定令牌的元组,或者在后端未找到状态时为 None

引发
返回类型

Optional[tuple[bytes, Any]]

abstract property name: str#

获取后端的名称。

abstract set_state(state, token=None)[source]#

设置集合点状态。

新的集合点状态是条件性设置的

  • 如果指定的 token 与后端中存储的锁定令牌匹配,则状态将被更新。新的状态将连同其锁定令牌一起返回给调用者。

  • 如果指定的 token 不匹配后端中存储的锁定令牌,则状态将不会被更新;而是现有状态连同其锁定令牌将返回给调用者。

  • 如果指定的 tokenNone,则只有当后端没有现有状态时,才会设置新状态。新状态或现有状态连同其锁定令牌将返回给调用者。

参数
  • state (bytes) – 编码的集合点状态。

  • token (Optional[Any]) – 通过之前调用 get_state()set_state() 检索到的可选锁定令牌。

返回

序列化的集合点状态、其锁定令牌以及指示我们的设置尝试是否成功的布尔值的元组。

引发
返回类型

Optional[tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]#

保存集合点的超时配置。

参数
  • join (Optional[timedelta]) – 集合点预计完成的时间。

  • last_call (Optional[timedelta]) – 在集合点达到所需参与者数量后完成集合点的额外等待时间。

  • close (Optional[timedelta]) – 集合点在调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 后预计关闭的时间。

  • heartbeat (Optional[timedelta]) – 预计完成的保持活跃心跳的时间。

property close: timedelta#

获取关闭超时。

property heartbeat: timedelta#

获取保持活跃心跳超时。

property join: timedelta#

获取加入超时。

property last_call: timedelta#

获取最后调用超时。

C10d Backend

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]#

根据指定的参数创建一个新的 C10dRendezvousBackend

参数

描述

store_type

C10d store 的类型。当前支持的类型为 “tcp” 和 “file”,分别对应 torch.distributed.TCPStoretorch.distributed.FileStore。默认为 “tcp”。

read_timeout

store 操作的读取超时时间(秒)。默认为 60 秒。

注意,这仅适用于 torch.distributed.TCPStore。它与 torch.distributed.FileStore 无关,后者不接受超时作为参数。

is_host

一个布尔值,指示此后端实例是否将托管 C10d store。如果未指定,它将通过将此机器的主机名或 IP 地址与指定的集合点端点进行匹配来启发式推断。默认为 None

请注意,此配置选项仅适用于 torch.distributed.TCPStore。在正常情况下,您可以安全地跳过它;唯一需要它的时候是当其值无法正确确定时(例如,集合点端点具有 CNAME 作为主机名或不匹配机器的 FQDN)。

返回类型

tuple[torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source]#

表示基于 C10d 的集合点后端。

参数
get_state()[source]#

参见基类。

返回类型

Optional[tuple[bytes, Any]]

property name: str#

参见基类。

set_state(state, token=None)[source]#

参见基类。

返回类型

Optional[tuple[bytes, Any, bool]]

Etcd Backend

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]#

根据指定参数创建新的 EtcdRendezvousBackend

参数

描述

read_timeout

etcd 操作的读取超时时间(秒)。默认为 60 秒。

protocol

用于与 etcd 通信的协议。有效值为“http”和“https”。默认为“http”。

ssl_cert

与 HTTPS 一起使用的 SSL 客户端证书路径。默认为 None

ssl_cert_key

与 HTTPS 一起使用的 SSL 客户端证书的私钥路径。默认为 None

ca_cert

根 SSL 证书颁发机构的路径。默认为 None

返回类型

tuple[torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source]#

表示一个基于 etcd 的集合点后端。

参数
  • client (Client) – 用于与 etcd 通信的 etcd.Client 实例。

  • run_id (str) – 集合点的运行 ID。

  • key_prefix (Optional[str]) – 在 etcd 中存储集合点状态的路径。

  • ttl (Optional[int]) – 集合点状态的 TTL。如果未指定,则默认为两小时。

get_state()[source]#

参见基类。

返回类型

Optional[tuple[bytes, Any]]

property name: str#

参见基类。

set_state(state, token=None)[source]#

参见基类。

返回类型

Optional[tuple[bytes, Any, bool]]

Etcd 集合点 (旧版)#

警告

DynamicRendezvousHandler 类取代了 EtcdRendezvousHandler 类,并且推荐给大多数用户使用。EtcdRendezvousHandler 处于维护模式,将来会被弃用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source]#

实现了由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支持的 torch.distributed.elastic.rendezvous.RendezvousHandler 接口。EtcdRendezvousHandler 使用 URL 来配置要使用的集合点类型,并将实现特定的配置传递给集合点模块。基本的 etcd 集合点配置 URL 如下所示:

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://:2379/1234?min_workers=1&max_workers=3

上述 URL 的解释如下:

  1. 使用与“etcd”方案注册的集合点处理程序。

  2. 要使用的“etcd”端点是 localhost:2379

  3. job_id == 1234 用作 etcd 中的前缀(这允许在多个作业共享一个公共 etcd 服务器,只要 job_ids 保证是唯一的)。请注意,作业 ID 可以是任何字符串(例如,不需要是数字),只要它是唯一的。

  4. min_workers=1max_workers=3 指定了成员资格大小的范围——只要集群大小大于或等于 min_workers,Torch Distributed Elastic 就会开始运行作业,并允许最多 max_workers 加入集群。

以下是可以传递给 etcd 集合点的参数的完整列表:

参数

描述

min_workers

集合点有效所需的最小工作节点数。

max_workers

允许的最大工作节点数。

timeout

next_rendezvous 预期成功的总超时时间(默认为 600 秒)。

last_call_timeout

在达到最小工作节点数后,“最后一次调用”的额外等待时间(默认为 30 秒)。

etcd_prefix

etcd 中所有 etcd 节点将被创建的路径前缀(从 etcd 根目录开始)(默认为 /torchelastic/p2p)。

Etcd 存储#

当 etcd 被用作集合点后端时,EtcdStorenext_rendezvous() 返回的 C10d Store 实例类型。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source]#

通过利用集合点的 etcd 实例来实现 C10 Store 接口。

这是 EtcdRendezvous 返回的存储对象。

add(key, num)[source]#

原子地将值增加一个整数量。

整数以十进制字符串表示。如果密钥不存在,则假定默认值为 0

返回

新的(已递增的)值

返回类型

int

check(keys)[source]#

检查所有密钥是否立即存在(无需等待)。

返回类型

布尔值

get(key)[source]#

按键获取值,可能进行阻塞等待。

如果密钥未立即存在,将阻塞等待,最多持续 timeout 时间,或者直到密钥被发布。

返回

(bytes)

引发

LookupError - 如果密钥在超时后仍未发布

返回类型

字节

set(key, value)[source]#

将键/值对写入 EtcdStore

键和值都可以是 Python strbytes

wait(keys, override_timeout=None)[source]#

等待直到所有密钥都已发布,或直到超时。

引发

LookupError - 如果发生超时

Etcd 服务器#

EtcdServer 是一个便利类,它使您能够轻松地在子进程中启动和停止 etcd 服务器。这对于测试或单节点(多工作节点)部署非常有用,在这些场景中,手动在旁边设置 etcd 服务器很麻烦。

警告

对于生产和多节点部署,请考虑正确部署高可用性的 etcd 服务器,因为这是分布式作业的单点故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]#

注意

已在 etcd 服务器 v3.4.3 上进行测试。

在随机空闲端口上启动和停止本地独立 etcd 服务器。适用于单节点、多工作节点启动或测试,在这些情况下,etcd 服务器旁挂比单独设置 etcd 服务器更方便。

此类会注册一个终止处理程序,以便在退出时关闭 etcd 子进程。此终止处理程序不能替代调用 stop() 方法。

以下是用于查找 etcd 二进制文件的回退机制:

  1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH。

  2. 如果存在,则使用 <此文件根目录>/bin/etcd

  3. 使用 PATH 中的 etcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
参数

etcd_binary_path – etcd 服务器二进制文件的路径(有关回退路径,请参见上文)。