评价此页

Remote Reference Protocol#

Created On: Nov 20, 2019 | Last Updated On: Apr 27, 2025

This note describes the design details of Remote Reference protocol and walks through message flows in different scenarios. Make sure you’re familiar with the Distributed RPC Framework before proceeding.

Background#

RRef stands for Remote REFerence. It is a reference of an object which is located on the local or remote worker, and transparently handles reference counting under the hood. Conceptually, it can be considered as a distributed shared pointer. Applications can create an RRef by calling remote(). Each RRef is owned by the callee worker of the remote() call (i.e., owner) and can be used by multiple users. The owner stores the real data and keeps track of the global reference count. Every RRef can be uniquely identified by a global RRefId, which is assigned at the time of creation on the caller of the remote() call.

On the owner worker, there is only one OwnerRRef instance, which contains the real data, while on user workers, there can be as many UserRRefs as necessary, and UserRRef does not hold the data. All usage on the owner will retrieve the unique OwnerRRef instance using the globally unique RRefId. A UserRRef will be created when it is used as an argument or return value in rpc_sync(), rpc_async() or remote() invocation, and the owner will be notified according to update the reference count. An OwnerRRef and its data will be deleted when there is no UserRRef instances globally and there are no reference to the OwnerRRef on the owner as well.

Assumptions#

RRef protocol is designed with the following assumptions.

  • Transient Network Failures: The RRef design handles transient network failures by retrying messages. It cannot handle node crashes or permanent network partitions. When those incidents occur, the application should take down all workers, revert to the previous checkpoint, and resume training.

  • Non-idempotent UDFs: We assume the user functions (UDF) provided to rpc_sync(), rpc_async() or remote() are not idempotent and therefore cannot be retried. However, internal RRef control messages are idempotent and retried upon message failure.

  • Out of Order Message Delivery: We do not assume message delivery order between any pair of nodes, because both sender and receiver are using multiple threads. There is no guarantee on which message will be processed first.

RRef Lifetime#

The goal of the protocol is to delete an OwnerRRef at an appropriate time. The right time to delete an OwnerRRef is when there are no living UserRRef instances and user code is not holding references to the OwnerRRef either. The tricky part is to determine if there are any living UserRRef instances.

Design Reasoning#

A user can get a UserRRef in three situations

  1. Receiving a UserRRef from the owner.

  2. Receiving a UserRRef from another user.

  3. Creating a new UserRRef owned by another worker.

Case 1 is the simplest where the owner passes its RRef to a user, where the owner calls rpc_sync(), rpc_async(), or remote() and uses its RRef as an argument. In this case a new UserRRef will be created on the user. As the owner is the caller, it can easily update its local reference count on the OwnerRRef.

The only requirement is that any UserRRef must notify the owner upon destruction. Hence, we need the first guarantee

G1. The owner will be notified when any UserRRef is deleted.

As messages might come delayed or out-of-order, we need one more guarantee to make sure the delete message is not processed too soon. If A sends a message to B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B (the child RRef).

G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the owner.

In cases 2 and 3, it is possible that the owner has only partial or no knowledge at all about the RRef fork graph. For example, an RRef could be constructed on a user, and before the owner receives any RPC call, the creator user might have already shared the RRef with other users, and those users could further share the RRef. One invariant is that the fork graph of any RRef is always a tree, because forking an RRef always creates a new UserRRef instance on the callee (except if the callee is the owner), and hence every RRef has a single parent.

The owner’s view on any UserRRef in the tree has three stages

1) unknown -> 2) known -> 3) deleted.

The owner’s view of the entire tree keeps changing. The owner deletes its OwnerRRef instance when it thinks there are no living UserRRef instances, i.e., when OwnerRRef is deleted, all UserRRef instances could be either indeed deleted or unknown. The dangerous case is when some forks are unknown and others are deleted.

G2 trivially guarantees that no parent UserRRef can be deleted before the owner knows all of its children UserRRef instances. However, it is possible that the child UserRRef may be deleted before the owner knows its parent UserRRef.

Consider the following example, where the OwnerRRef forks to A, then A forks to Y, and Y forks to Z

OwnerRRef -> A -> Y -> Z

If all of Z’s messages, including the delete message, are processed by the owner before Y’s messages. the owner will learn of Z’s deletion before knowing Y exists. Nevertheless, this does not cause any problem. Because, at least one of Y’s ancestors will be alive (A) and it will prevent the owner from deleting the OwnerRRef. More specifically, if the owner does not know Y, A cannot be deleted due to G2, and the owner knows A since it is A’s parent.

Things get a little trickier if the RRef is created on a user

OwnerRRef
    ^
    |
    A -> Y -> Z

If Z calls to_here() on the UserRRef, the owner at least knows A when Z is deleted, because otherwise, to_here() wouldn’t finish. If Z does not call to_here(), it is possible that the owner receives all messages from Z before any message from A and Y. In this case, as the real data of the OwnerRRef has not been created yet, there is nothing to be deleted either. It is the same as Z does not exist at all. Hence, it’s still OK.

Implementation#

G1 is implemented by sending out a delete message in UserRRef destructor. To provide G2, the parent UserRRef is put into a context whenever it is forked, indexed by the new ForkId. The parent UserRRef is only removed from the context when it receives an acknowledgement message (ACK) from the child, and the child will only send out the ACK when it is confirmed by the owner.

Protocol Scenarios#

Let’s now discuss how the above designs translate to the protocol in four scenarios.

User Share RRef with Owner as Return Value#

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

In this case, the UserRRef is created on the user worker A, then it is passed to the owner worker B together with the remote message, and then B creates the OwnerRRef. The method remote() returns immediately, meaning that the UserRRef can be forked/used before the owner knows about it.

在 owner 端,收到 remote() 调用后,它会创建 OwnerRRef,并返回一个 ACK 来确认 {100, 1} ( RRefId, ForkId )。只有在收到此 ACK 后,A 才能删除其 UserRRef。这涉及到 G1G2G1 是显而易见的。对于 G2OwnerRRefUserRRef 的子项,并且 UserRRef 在收到 owner 的 ACK 之前不会被删除。

user_to_owner_ret.png

上图显示了消息流,其中实线箭头包含用户函数,虚线箭头是内置消息。请注意,从 A 到 B 的前两条消息 ( remote()to_here() ) 可能会以任意顺序到达 B,但最终的删除消息仅在

  • B 确认 UserRRef {100, 1} (G2) 后才会发送,并且

  • Python GC 同意删除本地 UserRRef 实例。当 RRef 不再在作用域内且可被垃圾回收时,就会发生这种情况。

用户将 RRef 作为参数共享给 Owner#

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,在 A 上创建 UserRRef 后,A 将其作为参数在后续的 RPC 调用中传递给 B。A 将保持 UserRRef {100, 1} 的活动状态,直到收到 B 的确认 ( G2,而不是 RPC 调用的返回值)。这是必要的,因为 A 在所有先前消息都已收到之前不应发送删除消息,否则,OwnerRRef 可能会在被使用之前就被删除,因为我们不保证消息传递顺序。这是通过创建 RRef 的 ForkId 子项,并将其保存在一个映射中,直到收到 owner 对子 ForkId 的确认。下图显示了消息流。

user_to_owner_arg.png

请注意,UserRRef 可以在 B 函数完成或开始之前被删除。但这是可以接受的,因为在 B 发送子 ForkId 的 ACK 时,它已经获取了 OwnerRRef 实例,这将防止其过早被删除。

Owner 与 User 共享 RRef#

Owner 到 User 是最简单的情况,owner 可以本地更新引用计数,而不需要任何额外的控制消息来通知其他人。关于 G2,它与父项立即收到 owner 的 ACK 相同,因为父项就是 owner。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图显示了消息流。请注意,当 OwnerRRef 在 rpc_async 调用后退出作用域时,它不会被删除,因为内部有一个映射来保持其活动状态,如果存在任何已知的 fork,则为 UserRRef {100, 1}。 ( G2 )

User 与 User 共享 RRef#

这是最复杂的情况,调用者用户 (父 UserRRef),被调用者用户 (子 UserRRef),以及 owner 都需要参与进来。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

当 C 收到 A 发送的子 UserRRef 时,它会向 owner B 发送一个 fork 请求。之后,当 B 确认 C 上的 UserRRef 时,C 将并行执行两个操作:1) 向 A 发送子 ACK,以及 2) 运行用户提供的函数。在此期间,父项 (A) 将保持其 UserRRef {100, 1} 的活动状态以实现 G2