评价此页

远程引用协议#

创建日期:2019年11月20日 | 最后更新日期:2025年4月27日

本说明介绍了远程引用协议的设计细节,并讲解了不同场景下的消息流。在继续之前,请确保您熟悉分布式RPC框架

背景#

RRef 代表远程引用(Remote REFerence)。它是一个位于本地或远程工作器上对象的引用,并在底层透明地处理引用计数。从概念上讲,它可以被视为一个分布式共享指针。应用程序可以通过调用remote()来创建RRef。每个RRef都由remote()调用的被调用方工作器(即所有者)拥有,并可供多个用户使用。所有者存储实际数据并跟踪全局引用计数。每个RRef都可以通过一个全局的RRefId唯一标识,该ID在remote()调用的调用方上创建时分配。

在所有者工作器上,只有一个OwnerRRef实例,其中包含实际数据,而在用户工作器上,可以有任意数量的UserRRefs,并且UserRRef不持有数据。所有者上的所有使用都将使用全局唯一的RRefId检索唯一的OwnerRRef实例。当UserRRefrpc_sync()rpc_async()remote()调用中用作参数或返回值时,将创建一个UserRRef,并且所有者将收到通知以更新引用计数。当全局没有UserRRef实例且所有者上也没有对OwnerRRef的引用时,OwnerRRef及其数据将被删除。

假设#

RRef协议是基于以下假设设计的。

  • 瞬时网络故障:RRef设计通过重试消息来处理瞬时网络故障。它无法处理节点崩溃或永久性网络分区。当这些事件发生时,应用程序应关闭所有工作器,回滚到上一个检查点,并恢复训练。

  • 非幂等UDF:我们假设提供给rpc_sync()rpc_async()remote()的用户函数(UDF)不是幂等的,因此不能重试。但是,内部RRef控制消息是幂等的,并在消息失败时重试。

  • 乱序消息传递:我们不假设任何一对节点之间的消息传递顺序,因为发送方和接收方都使用多线程。无法保证哪条消息将首先被处理。

RRef生命周期#

该协议的目标是在适当的时间删除OwnerRRef。删除OwnerRRef的正确时机是当没有存活的UserRRef实例且用户代码也没有持有对OwnerRRef的引用时。棘手的部分是如何确定是否存在任何存活的UserRRef实例。

设计原理#

用户可以在三种情况下获取UserRRef

  1. 从所有者接收UserRRef

  2. 从另一个用户接收UserRRef

  3. 创建由另一个工作器拥有的新UserRRef

情况1最简单,所有者将其RRef传递给一个用户,其中所有者调用rpc_sync()rpc_async()remote()并将其RRef用作参数。在这种情况下,将在用户端创建一个新的UserRRef。由于所有者是调用者,它可以轻松更新OwnerRRef上的本地引用计数。

唯一的要求是任何UserRRef在销毁时必须通知所有者。因此,我们需要第一个保证:

G1. 当任何UserRRef被删除时,所有者将收到通知。

由于消息可能会延迟或乱序到达,我们需要一个额外的保证来确保删除消息不会过早处理。如果A向B发送涉及RRef的消息,我们将A上的RRef称为父RRef,将B上的RRef称为子RRef。

G2. 父RRef在子RRef被所有者确认之前不会被删除。

在情况2和3中,所有者可能只对RRef的分叉图有部分了解甚至完全不了解。例如,一个RRef可能在一个用户上构建,并且在所有者收到任何RPC调用之前,创建者用户可能已经与所有者共享了该RRef,而这些用户可能进一步共享该RRef。一个不变的原则是,任何RRef的分叉图始终是一棵树,因为分叉RRef总是在被调用方(除非被调用方是所有者)上创建一个新的UserRRef实例,因此每个RRef只有一个父级。

所有者对树中任何UserRRef的视图有三个阶段

1) unknown -> 2) known -> 3) deleted.

所有者对整个树的视图不断变化。当所有者认为没有存活的UserRRef实例时,它会删除其OwnerRRef实例,即当OwnerRRef被删除时,所有UserRRef实例可能确实已被删除或未知。危险的情况是当某些分叉未知而其他分叉已被删除时。

G2简单地保证了在所有者知道其所有子UserRRef实例之前,不会删除任何父UserRRef。然而,子UserRRef可能在所有者知道其父UserRRef之前被删除。

考虑以下示例,其中OwnerRRef分叉到A,然后A分叉到Y,Y分叉到Z:

OwnerRRef -> A -> Y -> Z

如果所有Z的消息,包括删除消息,都在Y的消息之前被所有者处理,那么所有者将在知道Y存在之前得知Z的删除。然而,这不会引起任何问题。因为,Y的至少一个祖先(A)将存活,它将阻止所有者删除OwnerRRef。更具体地说,如果所有者不知道Y,则由于G2,A不能被删除,并且所有者知道A,因为A是Y的父级。

如果RRef是在用户端创建的,事情会变得有点复杂:

OwnerRRef
    ^
    |
    A -> Y -> Z

如果Z在UserRRef上调用to_here(),那么当Z被删除时,所有者至少知道A,否则to_here()将无法完成。如果Z不调用to_here(),所有者可能会在收到来自A和Y的任何消息之前收到Z的所有消息。在这种情况下,由于OwnerRRef的实际数据尚未创建,因此也没有什么可删除的。这与Z根本不存在的情况相同。因此,这仍然没有问题。

实现#

G1通过在UserRRef析构函数中发送删除消息来实现。为了提供G2,父UserRRef在每次分叉时都被放入一个上下文中,并由新的ForkId索引。父UserRRef只有在收到来自子级的确认消息(ACK)时才从上下文中移除,而子级只有在所有者确认后才会发送ACK。

协议场景#

现在让我们讨论上述设计如何在四种场景下转化为协议。

用户以返回值形式与所有者共享RRef#

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在这种情况下,UserRRef在用户工作器A上创建,然后随远程消息一起传递给所有者工作器B,然后B创建OwnerRRef。方法remote()立即返回,这意味着在所有者知道之前,UserRRef就可以被分叉/使用。

在所有者端,当收到remote()调用时,它将创建OwnerRRef,并返回一个ACK以确认{100, 1}RRefId, ForkId)。只有在收到此ACK后,A才能删除其UserRRef。这涉及G1G2G1是显而易见的。对于G2OwnerRRefUserRRef的子级,并且UserRRef在收到所有者的ACK之前不会被删除。

user_to_owner_ret.png

上图显示了消息流,其中实线箭头表示用户函数,虚线箭头表示内置消息。请注意,从A到B的前两条消息(remote()to_here())可能以任何顺序到达B,但最终的删除消息只会在以下情况发送:

  • B确认UserRRef {100, 1} (G2),并且

  • Python GC同意删除本地UserRRef实例。这发生在RRef不再在作用域内并符合垃圾回收条件时。

用户以参数形式与所有者共享RRef#

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,在A上创建UserRRef之后,A将其用作后续RPC调用B的参数。A将使UserRRef {100, 1}保持存活,直到收到B的确认(G2,而不是RPC调用的返回值)。这是必要的,因为A不应在收到所有先前的消息之前发送删除消息,否则,由于我们不保证消息传递顺序,OwnerRRef可能会在使用前被删除。这通过创建RRef的子ForkId,并将其保存在映射中直到收到所有者确认子ForkId来实现。下图显示了消息流。

user_to_owner_arg.png

请注意,UserRRef可能在func完成甚至开始之前就在B上被删除。然而,这是可以的,因为在B发送子ForkId的ACK时,它已经获取了OwnerRRef实例,这将防止它过早被删除。

所有者与用户共享RRef#

所有者到用户是最简单的情况,所有者可以在本地更新引用计数,不需要任何额外的控制消息来通知其他方。关于G2,这与父级立即收到所有者的ACK相同,因为父级就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图显示了消息流。请注意,当OwnerRRef在rpc_async调用后超出作用域时,它不会被删除,因为内部有一个映射来保持它存活(如果存在任何已知分叉),在这种情况下是UserRRef {100, 1}。(G2)

用户与用户共享RRef#

这是最复杂的情况,调用方用户(父UserRRef)、被调用方用户(子UserRRef)和所有者都需要参与进来。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

当C从A接收到子UserRRef时,它会向所有者B发送一个分叉请求。随后,当B确认C上的UserRRef时,C将并行执行两个动作:1) 向A发送子ACK,2) 运行用户提供的函数。在此期间,父级(A)将使其UserRRef {100, 1}保持存活以实现G2