评价此页

远程引用协议#

创建时间:2019 年 11 月 20 日 | 最后更新时间:2025 年 4 月 27 日

本文档描述了远程引用协议的设计细节,并通过不同场景下的消息流进行讲解。在继续阅读之前,请确保您已熟悉 分布式 RPC 框架

背景#

RRef 是 Remote REFerence(远程引用)的缩写。它是一个指向位于本地或远程工作节点上的对象的引用,并在底层透明地处理引用计数。概念上,它可以被视为一个分布式共享指针。应用程序可以通过调用 remote() 来创建 RRef。每个 RRef 都由 remote() 调用的被调用方工作节点(即所有者)拥有,并且可以被多个用户使用。所有者存储实际数据并跟踪全局引用计数。每个 RRef 都可以通过一个全局唯一的 RRefId 来标识,该 ID 在 remote() 调用方处创建时分配。

在所有者工作节点上,只有一个 OwnerRRef 实例,它包含实际数据;而在用户工作节点上,可以根据需要有任意数量的 UserRRef,并且 UserRRef 不持有数据。所有者上的所有使用都将通过全局唯一的 RRefId 检索唯一的 OwnerRRef 实例。当 UserRRef 作为 rpc_sync()rpc_async()remote() 调用的参数或返回值时,会创建一个 UserRRef,所有者会收到通知以更新引用计数。当全局没有 UserRRef 实例且所有者上也没有对 OwnerRRef 的引用时,OwnerRRef 及其数据将被删除。

假设#

RRef 协议的设计基于以下假设。

  • 瞬态网络故障:RRef 设计通过重试消息来处理瞬态网络故障。它无法处理节点崩溃或永久性网络分区。当发生这些事件时,应用程序应关闭所有工作节点,回滚到先前的检查点,然后恢复训练。

  • 非幂等的 UDF:我们假设提供给 rpc_sync()rpc_async()remote() 的用户函数(UDF)是非幂等的,因此不能重试。然而,内部 RRef 控制消息是幂等的,并且会在消息失败时重试。

  • 消息乱序投递:我们不假设节点之间消息的投递顺序,因为发送方和接收方都使用了多个线程。消息的哪个先被处理没有保证。

RRef 生命周期#

该协议的目标是在适当的时候删除 OwnerRRef。删除 OwnerRRef 的合适时机是当没有活动的 UserRRef 实例,并且用户代码也没有持有对 OwnerRRef 的引用时。棘手的部分在于确定是否存在活动的 UserRRef 实例。

设计思路#

用户可以在三种情况下获得 UserRRef

  1. 从所有者那里接收 UserRRef

  2. 从另一个用户那里接收 UserRRef

  3. 创建由另一个工作节点拥有的新 UserRRef

第一种情况最简单,即所有者将自己的 RRef 传递给用户,其中所有者调用 rpc_sync()rpc_async()remote() 并将其 RRef 作为参数。在这种情况下,用户端将创建一个新的 UserRRef。由于所有者是调用者,它可以轻松更新其在 OwnerRRef 上的本地引用计数。

唯一的要求是,任何 UserRRef 都必须在其销毁时通知所有者。因此,我们需要第一个保证:

G1. 当任何 UserRRef 被删除时,所有者都会收到通知。

由于消息可能延迟或乱序到达,我们需要另一个保证来确保删除消息不会过早处理。如果 A 向 B 发送一条涉及 RRef 的消息,我们将 A 上的 RRef(父 RRef)和 B 上的 RRef(子 RRef)称为。

G2. 父 RRef 在子 RRef 被所有者确认之前不会被删除。

在第二种和第三种情况中,所有者可能对 RRef 分叉图只有部分或完全不了解。例如,一个 RRef 可以在用户端创建,并且在所有者收到任何 RPC 调用之前,创建用户可能已经将 RRef 分享给其他用户,而这些用户又可以进一步共享 RRef。一个不变的规则是,任何 RRef 的分叉图始终是一棵树,因为分叉 RRef 总是会在被调用方(除非被调用方是所有者)创建一个新的 UserRRef 实例,因此每个 RRef 都有一个父节点。

所有者对树中任何 UserRRef 的视图有三个阶段:

1) unknown -> 2) known -> 3) deleted.

所有者对整个树的视图在不断变化。当所有者认为没有活动的 UserRRef 实例时,它会删除其 OwnerRRef 实例,即当 OwnerRRef 被删除时,所有 UserRRef 实例可能确实已被删除,也可能未知。危险的情况是当一些分叉未知而另一些已被删除。

G2 仅仅保证了在所有者了解其所有子 UserRRef 实例之前,没有任何父 UserRRef 可以被删除。然而,子 UserRRef 可能在所有者知道其父 UserRRef 之前就被删除。

考虑以下示例,其中 OwnerRRef 分叉到 A,然后 A 分叉到 Y,Y 分叉到 Z:

OwnerRRef -> A -> Y -> Z

如果 Z 的所有消息,包括删除消息,都在 Y 的消息之前被所有者处理,那么所有者将在知道 Y 存在之前得知 Z 的删除。尽管如此,这并没有造成任何问题。因为,至少 Y 的一个祖先(A)会存活,并且它将阻止所有者删除 OwnerRRef。更具体地说,如果所有者不知道 Y,那么根据 **G2**,A 就不能被删除,而所有者知道 A,因为它是 A 的父节点。

当 RRef 在用户端创建时,情况会稍微复杂一些。

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 在 UserRRef 上调用 to_here(),那么当 Z 被删除时,所有者至少会知道 A,因为否则 to_here() 就不会完成。如果 Z 没有调用 to_here(),那么所有者有可能在收到来自 A 和 Y 的任何消息之前就收到来自 Z 的所有消息。在这种情况下,由于 OwnerRRef 的实际数据尚未创建,因此也没有什么可删除的。这等同于 Z 根本不存在。因此,这仍然是没问题的。

实现#

G1 通过在 UserRRef 析构函数中发送删除消息来实现。为了提供 **G2**,父 UserRRef 在被分叉时会被放入一个上下文(context)中,并由新的 ForkId 索引。父 UserRRef 只有在收到子节点的确认消息(ACK)后才会被从上下文中移除,而子节点只有在得到所有者的确认后才会发出 ACK。

协议场景#

现在让我们讨论上述设计在四种场景下的协议转换。

用户将 RRef 作为返回值共享给所有者#

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在这种情况下,UserRRef 在用户工作节点 A 上创建,然后与远程消息一起传递给所有者工作节点 B,然后 B 创建 OwnerRRefremote() 方法会立即返回,这意味着 UserRRef 可以在所有者知道它之前被分叉/使用。

在所有者端,当收到 remote() 调用时,它将创建 OwnerRRef,并返回一个 ACK 来确认 {100, 1}RRefIdForkId)。只有在收到此 ACK 后,A 才能删除其 UserRRef。这涉及 **G1** 和 **G2**。 **G1** 是显而易见的。对于 **G2**,OwnerRRefUserRRef 的子节点,并且 UserRRef 在收到所有者的 ACK 之前不会被删除。

user_to_owner_ret.png

上图显示了消息流,其中实线箭头包含用户函数,虚线箭头是内置消息。请注意,从 A 到 B 的前两条消息(remote()to_here())可能以任何顺序到达 B,但最终的删除消息只有在以下情况发生时才会发送:

  • B 确认了 UserRRef {100, 1} (G2),并且

  • Python GC 同意删除本地 UserRRef 实例。当 RRef 不再处于作用域内且有资格被垃圾回收时,就会发生这种情况。

用户将 RRef 作为参数共享给所有者#

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,在 A 上创建 UserRRef 后,A 将其作为参数用于后续发送给 B 的 RPC 调用。A 将 UserRRef {100, 1} 保持存活,直到收到 B 的确认(**G2**,而不是 RPC 调用的返回值)。这是必要的,因为 A 不应在所有先前消息都收到之前发送删除消息,否则,由于我们不保证消息投递顺序,OwnerRRef 可能会在被使用前被删除。这是通过创建一个 RRef 的子 ForkId,将其保存在一个映射中,直到收到所有者确认子 ForkId。下图显示了消息流。

user_to_owner_arg.png

请注意,UserRRef 可能在 func 完成之前或甚至开始之前就在 B 上被删除。但这是可以接受的,因为在 B 发出对子 ForkId 的 ACK 时,它已经获得了 OwnerRRef 实例,这将阻止它被过早删除。

所有者将 RRef 共享给用户#

从所有者到用户的共享是最简单的情况,所有者可以本地更新引用计数,并且不需要额外的控制消息来通知他人。关于 **G2**,这与父节点立即收到所有者 ACK 相同,因为父节点就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图显示了消息流。请注意,当 OwnerRRef 在 rpc_async 调用后退出作用域时,它不会被删除,因为内部有一个映射来保持其存活,如果存在任何已知的分叉,在这种情况下就是 UserRRef {100, 1}。(**G2**)

用户将 RRef 共享给用户#

这是最复杂的情况,调用方用户(父 UserRRef)、被调用方用户(子 UserRRef)以及所有者都需要参与。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

当 C 从 A 接收到子 UserRRef 时,它会向所有者 B 发送一个分叉请求。之后,当 B 确认 C 上的 UserRRef 时,C 将并行执行两个操作:1)向 A 发送子 ACK,以及 2)运行用户提供的函数。在此期间,父节点(A)将保持其 UserRRef {100, 1} 存活以实现 **G2**。