Rendezvous#
Created On: May 04, 2021 | Last Updated On: May 22, 2024
In the context of Torch Distributed Elastic we use the term rendezvous to refer to a particular functionality that combines a distributed synchronization primitive with peer discovery.
It is used by Torch Distributed Elastic to gather participants of a training job (i.e. nodes) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.
Torch Distributed Elastic rendezvous provides the following critical functionalities
Barrier:
Nodes performing rendezvous will all block until the rendezvous is considered complete - this happens when at least min
total number of nodes have joined the rendezvous barrier (for the same job). This also implies the barrier is not necessarily of fixed size.
There’s an additional small waiting time after reaching min
number of nodes - this is used to ensure the rendezvous is not completed “too quickly” (which could potentially exclude additional nodes attempting to join at approximately the same time).
If max
number of nodes is gathered at the barrier, the rendezvous is completed immediately.
There’s also an overall timeout which causes the rendezvous to fail if min
number of nodes is never reached - this is meant to be a simple fail-safe to help release partially allocated job resources, in case there’s a problem with the resource manager, and is meant to be interpreted as non-retryable.
Exclusivity:
A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of nodes exists at any given time (for a given job). In other words, new nodes (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.
Torch Distributed Elastic rendezvous ensures that if a group of nodes has already completed a rendezvous (and hence might already be training), then additional “late” nodes attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.
Consistency:
When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.
Note that ranks are not stable, in the sense that the same node can be assigned a different rank in the next (re-)rendezvous.
Fault-tolerance:
Torch Distributed Elastic rendezvous is designed to tolerate node failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy nodes will happen automatically.
A node can also fail after it has completed (or has been observed by other nodes to have completed) the rendezvous - this scenario will be handled by the Torch Distributed Elastic train_loop
instead (where it will also trigger a re-rendezvous).
Shared key-value store:
When the rendezvous is completed, a shared key-value store is created and returned. This store implements a torch.distributed.Store
API (see distributed communication docs).
This store is only shared by the members of the completed rendezvous. It is intended to be used by Torch Distributed Elastic to exchange information necessary to initialize job control and data-planes.
Waiting workers and rendezvous closing:
Torch Distributed Elastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process
Querying how many workers arrived late at the barrier, who can participate in next rendezvous.
Setting the rendezvous closed to signal all nodes not to participate in next rendezvous.
DynamicRendezvousHandler:
Torch Distributed Elastic comes with the DynamicRendezvousHandler
class that implements the rendezvous mechanism described above. It is a backend- agnostic type that expects a particular RendezvousBackend
instance to be specified during construction.
Torch distributed users can either implement their own backend type or use one of the following implementations that come with PyTorch
C10dRendezvousBackend
: Uses a C10d store (by defaultTCPStore
) as the rendezvous backend. The main advantage of using a C10d store is that it requires no 3rd-party dependency (such as etcd) to establish a rendezvous.EtcdRendezvousBackend
: Supersedes the legacyEtcdRendezvousHandler
class. Passing anEtcdRendezvousBackend
instance toDynamicRendezvousHandler
is functionally equivalent to instantiating anEtcdRendezvousHandler
.store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
Below is a state diagram describing how rendezvous works.

Registry#
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]#
Hold the parameters to construct a
RendezvousHandler
.- 参数
backend (str) – The name of the backend to use to handle the rendezvous.
endpoint (str) – The endpoint of the rendezvous, usually in form <hostname>[:<port>].
run_id (str) – The id of the rendezvous.
min_nodes (int) – The minimum number of nodes to admit to the rendezvous.
max_nodes (int) – The maximum number of nodes to admit to the rendezvous.
**kwargs – Additional parameters for the specified backend.
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]#
Represent a registry of
RendezvousHandler
backends.
Handler#
- class torch.distributed.elastic.rendezvous.RendezvousHandler[source]#
Main rendezvous interface.
注意
Distributed Torch users normally do not need to implement their own
RendezvousHandler
. An implementation based on C10d Store is already provided, and is recommended for most users.- abstract get_run_id()[source]#
Return the run id of the rendezvous.
The run id is a user-defined id that uniquely identifies an instance of a distributed application. It typically maps to a job id and is used to allow nodes to join the correct distributed application.
- 返回类型
- abstract is_closed()[source]#
检查集合点是否已关闭。
已关闭的集合点意味着在同一作业内未来所有重新集合的尝试都将失败。
is_closed()
和set_closed()
具有最终传播的语义,不应用于同步。意图是,如果至少有一个节点决定作业已完成,它将关闭集合点,其他节点将很快观察到这一点并也停止运行。- 返回类型
- abstract next_rendezvous()[source]#
进入集合点屏障的主入口。
阻塞直到集合点完成,并且当前进程包含在形成的 worker 组中,或者发生超时,或者集合点被标记为已关闭。
- 返回
一个
RendezvousInfo
的实例。- 引发
RendezvousClosedError – 集合点已关闭。
RendezvousConnectionError – 与集合点后端的连接失败。
RendezvousStateError – 集合点状态损坏。
RendezvousTimeoutError – 集合点未按时完成。
- 返回类型
- abstract num_nodes_waiting()[source]#
返回晚于集合点屏障到达的节点数量,因此未包含在当前 worker 组中。
调用者应定期调用此方法以检查是否有新节点正在等待加入作业,如果有,则通过调用
next_rendezvous()
(重新集合) 来接纳它们。- 返回类型
- abstract shutdown()[source]#
关闭为集合点打开的所有资源。
示例
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 返回类型
- property use_agent_store: bool#
指示由
next_rendezvous()
返回的 store 引用可以与用户应用程序共享,并在应用程序生命周期内可用。集合点处理程序实现将共享 store 详细信息作为
RendezvousStoreInfo
的实例。应用程序通常使用 MASTER_ADDR/MASTER_PORT 环境变量来查找 store。
Dataclasses
- class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]#
保存集合点的信息。
Exceptions
Implementations
Dynamic Rendezvous
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]#
根据指定的参数创建一个新的
DynamicRendezvousHandler
。- 参数
store (Store) – 作为集合点一部分返回的 C10d store。
backend (RendezvousBackend) – 用于保存集合点状态的后端。
- 返回类型
参数
描述
join_timeout
集合点预计完成的总时间(秒)。默认为 600 秒。
last_call_timeout
在集合点达到最小节点数后完成集合点的额外等待时间(秒)。默认为 30 秒。
close_timeout
集合点在调用
RendezvousHandler.set_closed()
或RendezvousHandler.shutdown()
后预计关闭的时间(秒)。默认为 30 秒。heartbeat
预计完成的保持活跃心跳的时间(秒)
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]#
表示设置节点集合点的处理程序。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source]#
创建一个新的
DynamicRendezvousHandler
。- 参数
run_id (str) – 集合点的运行 ID。
store (Store) – 作为集合点一部分返回的 C10d store。
backend (RendezvousBackend) – 用于保存集合点状态的后端。
min_nodes (int) – The minimum number of nodes to admit to the rendezvous.
max_nodes (int) – The maximum number of nodes to admit to the rendezvous.
timeout (Optional[RendezvousTimeout]) – 集合点的超时配置。
keep_alive_interval (int) – 节点在发送心跳以保持其在集合点中存活之前等待的时间。
keep_alive_max_attempt (int) – 在节点被视为死亡之前,失败心跳的最大尝试次数。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]#
表示保存集合点状态的后端。
- abstract get_state()[source]#
获取集合点状态。
- 返回
编码的集合点状态及其锁定令牌的元组,或者在后端未找到状态时为
None
。- 引发
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – 集合点状态损坏。
- 返回类型
- abstract set_state(state, token=None)[source]#
设置集合点状态。
新的集合点状态是条件性设置的
如果指定的
token
与后端中存储的锁定令牌匹配,则状态将被更新。新的状态将连同其锁定令牌一起返回给调用者。如果指定的
token
不匹配后端中存储的锁定令牌,则状态将不会被更新;而是现有状态连同其锁定令牌将返回给调用者。如果指定的
token
是None
,则只有当后端没有现有状态时,才会设置新状态。新状态或现有状态连同其锁定令牌将返回给调用者。
- 参数
state (bytes) – 编码的集合点状态。
token (Optional[Any]) – 通过之前调用
get_state()
或set_state()
检索到的可选锁定令牌。
- 返回
序列化的集合点状态、其锁定令牌以及指示我们的设置尝试是否成功的布尔值的元组。
- 引发
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – 集合点状态损坏。
- 返回类型
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]#
保存集合点的超时配置。
- 参数
C10d Backend
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]#
根据指定的参数创建一个新的
C10dRendezvousBackend
。参数
描述
store_type
C10d store 的类型。当前支持的类型为 “tcp” 和 “file”,分别对应
torch.distributed.TCPStore
和torch.distributed.FileStore
。默认为 “tcp”。read_timeout
store 操作的读取超时时间(秒)。默认为 60 秒。
注意,这仅适用于
torch.distributed.TCPStore
。它与torch.distributed.FileStore
无关,后者不接受超时作为参数。is_host
一个布尔值,指示此后端实例是否将托管 C10d store。如果未指定,它将通过将此机器的主机名或 IP 地址与指定的集合点端点进行匹配来启发式推断。默认为
None
。请注意,此配置选项仅适用于
torch.distributed.TCPStore
。在正常情况下,您可以安全地跳过它;唯一需要它的时候是当其值无法正确确定时(例如,集合点端点具有 CNAME 作为主机名或不匹配机器的 FQDN)。
- class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source]#
表示基于 C10d 的集合点后端。
- 参数
store (Store) – 用于与 C10d store 通信的
torch.distributed.Store
实例。run_id (str) – 集合点的运行 ID。
Etcd Backend
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]#
根据指定参数创建新的
EtcdRendezvousBackend
。参数
描述
read_timeout
etcd 操作的读取超时时间(秒)。默认为 60 秒。
protocol
用于与 etcd 通信的协议。有效值为“http”和“https”。默认为“http”。
ssl_cert
与 HTTPS 一起使用的 SSL 客户端证书路径。默认为
None
。ssl_cert_key
与 HTTPS 一起使用的 SSL 客户端证书的私钥路径。默认为
None
。ca_cert
根 SSL 证书颁发机构的路径。默认为
None
。
Etcd 集合点 (旧版)#
警告
DynamicRendezvousHandler
类取代了 EtcdRendezvousHandler
类,并且推荐给大多数用户使用。EtcdRendezvousHandler
处于维护模式,将来会被弃用。
- class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source]#
实现了由
torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous
支持的torch.distributed.elastic.rendezvous.RendezvousHandler
接口。EtcdRendezvousHandler
使用 URL 来配置要使用的集合点类型,并将实现特定的配置传递给集合点模块。基本的 etcd 集合点配置 URL 如下所示:etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://:2379/1234?min_workers=1&max_workers=3
上述 URL 的解释如下:
使用与“etcd”方案注册的集合点处理程序。
要使用的“etcd”端点是
localhost:2379
。job_id == 1234
用作 etcd 中的前缀(这允许在多个作业共享一个公共 etcd 服务器,只要job_ids
保证是唯一的)。请注意,作业 ID 可以是任何字符串(例如,不需要是数字),只要它是唯一的。min_workers=1
和max_workers=3
指定了成员资格大小的范围——只要集群大小大于或等于min_workers
,Torch Distributed Elastic 就会开始运行作业,并允许最多max_workers
加入集群。
以下是可以传递给 etcd 集合点的参数的完整列表:
参数
描述
min_workers
集合点有效所需的最小工作节点数。
max_workers
允许的最大工作节点数。
timeout
next_rendezvous 预期成功的总超时时间(默认为 600 秒)。
last_call_timeout
在达到最小工作节点数后,“最后一次调用”的额外等待时间(默认为 30 秒)。
etcd_prefix
etcd 中所有 etcd 节点将被创建的路径前缀(从 etcd 根目录开始)(默认为
/torchelastic/p2p
)。
Etcd 存储#
当 etcd 被用作集合点后端时,EtcdStore
是 next_rendezvous()
返回的 C10d Store
实例类型。
- class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source]#
通过利用集合点的 etcd 实例来实现 C10 Store 接口。
这是
EtcdRendezvous
返回的存储对象。
Etcd 服务器#
EtcdServer
是一个便利类,它使您能够轻松地在子进程中启动和停止 etcd 服务器。这对于测试或单节点(多工作节点)部署非常有用,在这些场景中,手动在旁边设置 etcd 服务器很麻烦。
警告
对于生产和多节点部署,请考虑正确部署高可用性的 etcd 服务器,因为这是分布式作业的单点故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]#
注意
已在 etcd 服务器 v3.4.3 上进行测试。
在随机空闲端口上启动和停止本地独立 etcd 服务器。适用于单节点、多工作节点启动或测试,在这些情况下,etcd 服务器旁挂比单独设置 etcd 服务器更方便。
此类会注册一个终止处理程序,以便在退出时关闭 etcd 子进程。此终止处理程序不能替代调用
stop()
方法。以下是用于查找 etcd 二进制文件的回退机制:
使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH。
如果存在,则使用
<此文件根目录>/bin/etcd
。使用 PATH 中的
etcd
。
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 参数
etcd_binary_path – etcd 服务器二进制文件的路径(有关回退路径,请参见上文)。