评价此页

分布式 RPC 框架#

创建日期:2019年11月14日 | 最后更新日期:2025年07月09日

分布式 RPC 框架通过一套原语提供多机模型训练的机制,以支持远程通信,并提供一个更高级的 API 来自动区分分布在多台机器上的模型。

警告

RPC 包中的 API 是稳定的,处于维护模式。

警告

CUDA 支持是一项**测试版**功能。RPC 包中的并非所有功能都与 CUDA 支持兼容,因此不建议使用。这些不受支持的功能包括:RRefs、JIT 兼容性、分布式自动微分和分布式优化器,以及性能分析。

注意

请参考 PyTorch 分布式 概述 <https://pytorch.ac.cn/tutorials/beginner/dist_overview.html>__,了解与分布式训练相关的所有功能的简要介绍。

基础知识#

分布式 RPC 框架可以轻松地远程运行函数,支持引用远程对象而无需复制实际数据,并提供自动微分和优化器 API,以透明地跨 RPC 边界运行反向传播和更新参数。这些功能可以分为四组 API。

  1. 远程过程调用 (RPC) 支持在指定的远程工作进程上运行带有给定参数的函数,并获取返回值或创建远程返回值的引用。主要有三个 RPC API:rpc_sync()(同步)、rpc_async()(异步)和remote()(异步并返回远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续执行,请使用同步 API。否则,请使用异步 API 获取一个 Future,并在需要返回值时等待该 Future。当需要远程创建某个对象而调用者永远不需要获取它时,remote() API 非常有用。设想一个驱动进程正在设置参数服务器和训练器。驱动进程可以在参数服务器上创建一个嵌入表,然后与训练器共享嵌入表的引用,但驱动进程本身永远不会在本地使用该嵌入表。在这种情况下,rpc_sync()rpc_async() 不再适用,因为它们总是意味着返回值会立即或在未来返回给调用者。

  2. 远程引用 (RRef) 作为本地或远程对象的分布式共享指针。它可以与其他工作进程共享,并且引用计数会被透明地处理。每个 RRef 只有一个所有者,对象仅存在于该所有者处。持有 RRef 的非所有者工作进程可以通过显式请求来获取对象的副本。这在某个工作进程需要访问某个数据对象,但它既不是对象的创建者(remote() 的调用者)也不是对象的拥有者时非常有用。下面将讨论的分布式优化器就是此类用例的一个例子。

  3. 分布式自动微分将所有参与前向传播的工作进程上的本地自动微分引擎拼接在一起,并在反向传播过程中自动访问它们以计算梯度。当需要跨多台机器进行前向传播时(例如,分布式模型并行训练、参数服务器训练等),这尤其有用。有了这个功能,用户代码就不必担心如何跨 RPC 边界发送梯度以及应该以何种顺序启动本地自动微分引擎,这在存在嵌套和相互依赖的 RPC 调用时可能会非常复杂。

  4. 分布式优化器的构造函数接受一个 Optimizer()(例如 SGD()Adagrad() 等)和一个参数 RRefs 列表,在每个不同的 RRef 所有者上创建一个 Optimizer() 实例,并在运行 step() 时相应地更新参数。当您进行分布式前向和反向传播时,参数和梯度将分布在多个工作进程中,因此需要在每个参与的工作进程上都有一个优化器。分布式优化器将所有这些本地优化器包装在一个中,并提供一个简洁的构造函数和 step() API。

RPC#

在使用 RPC 和分布式自动微分原语之前,必须进行初始化。为了初始化 RPC 框架,我们需要使用 init_rpc(),它将初始化 RPC 框架、RRef 框架和分布式自动微分。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]#

初始化 RPC 原语,例如本地 RPC 代理和分布式自动微分,这将立即使当前进程准备好发送和接收 RPC。

参数:
  • name (str) – 此节点的全局唯一名称。(例如,Trainer3ParameterServer2MasterWorker1)名称只能包含数字、字母、下划线、冒号和/或破折号,并且长度必须小于 128 个字符。

  • backend (BackendType, optional) – RPC 后端实现的类型。支持的值为 BackendType.TENSORPIPE(默认值)。有关更多信息,请参阅 后端

  • rank (int) – 此节点的全局唯一 ID/rank。

  • world_size (int) – 组中的工作进程数量。

  • rpc_backend_options (RpcBackendOptions, optional) – 传递给 RpcAgent 构造函数的选项。它必须是 RpcBackendOptions 的特定于代理的子类,并且包含特定于代理的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用 init_method = "env://" 初始化下的进程组进行集合。这意味着环境变量 MASTER_ADDRMASTER_PORT 需要正确设置。有关更多信息和可用选项,请参阅 后端

以下 API 允许用户远程执行函数以及创建远程数据对象的引用(RRefs)。在这些 API 中,当将 Tensor 作为参数或返回值传递时,目标工作进程将尝试创建一个具有相同元数据(例如,形状、步长等)的 Tensor。我们故意禁止传输 CUDA 张量,因为如果源和目标工作进程的设备列表不匹配,可能会导致崩溃。在这种情况下,应用程序始终可以显式地将输入张量移动到调用者的 CPU,并在必要时将其移动到被调用者的所需设备。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]#

进行一次阻塞式 RPC 调用,在工作进程 to 上运行函数 func。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。

参数:
  • to (strWorkerInfoint) – 目标工作进程的名称/rank/ WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置运算符(例如 add())和注释过的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – func 调用的关键字参数字典。

  • timeout (float, optional) – 此 RPC 使用的秒数超时。如果 RPC 在此时间内未完成,将引发表示超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,将使用在初始化期间或使用 _set_rpc_timeout 设置的默认值。

返回:

返回运行 funcargskwargs 的结果。

示例:

请确保在两个工作进程上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参考 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后,在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]#

进行一次非阻塞 RPC 调用,在工作进程 to 上运行函数 func。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个可以等待的 Future

参数:
  • to (strWorkerInfoint) – 目标工作进程的名称/rank/ WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置运算符(例如 add())和注释过的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – func 调用的关键字参数字典。

  • timeout (float, optional) – 此 RPC 使用的秒数超时。如果 RPC 在此时间内未完成,将引发表示超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,将使用在初始化期间或使用 _set_rpc_timeout 设置的默认值。

返回:

返回一个可以等待的 Future 对象。完成后,可以从 Future 对象中检索 funcargskwargs 上的返回值。

警告

不支持将 GPU 张量用作 func 的参数或返回值,因为我们不支持通过网络发送 GPU 张量。在使用 GPU 张量作为 func 的参数或返回值之前,需要显式将其复制到 CPU。

警告

rpc_async API 在通过网络发送参数张量之前不会复制它们的存储,这可能由 RPC 后端类型决定的不同线程完成。调用者应确保在返回的 Future 完成之前,这些张量的内容保持不变。

示例:

请确保在两个工作进程上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参考 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后,在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]#

执行远程调用,在工作进程 to 上运行 func,并立即返回结果值的 RRef。工作进程 to 将是返回的 RRef 的所有者,而调用 remote 的工作进程是用户。所有者管理其 RRef 的全局引用计数,并且 RRef 仅在全局没有对其的活动引用时才会被销毁。

参数:
  • to (strWorkerInfoint) – 目标工作进程的名称/rank/ WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置运算符(例如 add())和注释过的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – func 调用的关键字参数字典。

  • timeout (float, optional) – 此远程调用的秒数超时。如果在此超时时间内未在工作进程 to 上成功处理此 RRef 的创建,那么下次尝试使用 RRef 时(例如 to_here()),将引发指示此失败的超时错误。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,将使用在初始化期间或使用 _set_rpc_timeout 设置的默认值。

返回:

一个用户 RRef 实例,指向结果值。使用阻塞 API torch.distributed.rpc.RRef.to_here() 在本地检索结果值。

警告

remote API 在通过网络发送参数张量之前不会复制它们的存储,这可能由 RPC 后端类型决定的不同线程完成。调用者应确保在返回的 RRef 被所有者确认之前(可以使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 检查),这些张量的内容保持不变。

警告

诸如 remote API 的超时错误等错误会尽力处理。这意味着当 remote 启动的远程调用失败时,例如出现超时错误,我们会采取尽力而为的方式进行错误处理。这意味着错误是异步处理并设置在生成的 RRef 上的。如果 RRef 在此处理之前(例如 to_here 或 fork 调用)已被应用程序使用,那么 RRef 的后续使用将适当地引发错误。然而,用户应用程序有可能在使用 RRef 之前错误就已经被处理。在这种情况下,错误可能不会被引发,因为它们尚未被处理。

示例

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[source]#

获取给定工作进程名称的 WorkerInfo。使用此 WorkerInfo 可以避免在每次调用时传递昂贵的字符串。

参数:

worker_name (str) – 工作进程的字符串名称。如果为 None,则返回当前工作进程的 ID。(默认值为 None

返回:

给定 worker_nameWorkerInfo 实例,或者如果 worker_nameNone,则为当前工作进程的 WorkerInfo

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]#

执行 RPC 代理的关闭,然后销毁 RPC 代理。这会停止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果 graceful=True,此方法将阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。否则,如果 graceful=False,则为本地关闭,并且不会等待其他 RPC 进程到达此方法。

警告

对于 rpc_async() 返回的 Future 对象,不应在 shutdown() 之后调用 future.wait()

参数:

graceful (bool) – 是否执行优雅关闭。如果为 True,则 1)等待直到 UserRRefs 没有待处理的系统消息并删除它们;2)阻塞直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。

示例:

请确保在两个工作进程上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参考 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后,在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo#

封装系统中工作进程信息的结构。包含工作进程的名称和 ID。此类的实例不应直接构造,而应通过 get_worker_info() 获取实例,并将结果传递给 rpc_sync()rpc_async()remote() 等函数,以避免每次调用时都复制字符串。

property id#

用于标识工作进程的全局唯一 ID。

property name#

工作进程的名称。

RPC 包还提供了装饰器,允许应用程序指定给定函数在被调用方侧应如何处理。

torch.distributed.rpc.functions.async_execution(fn)[source]#

一个函数装饰器,指示函数的返回值保证是一个 Future 对象,并且该函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方提取包装函数返回的 Future,并将后续处理步骤安装为该 Future 的回调。安装的回调将在 Future 完成时读取其值,并将其作为 RPC 响应发送回去。这意味着返回的 Future 只存在于被调用方侧,并且永远不会通过 RPC 发送。当被包装函数(fn)的执行需要暂停和恢复时(例如,因为它包含 rpc_async() 或等待其他信号),此装饰器非常有用。

注意

要启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,它就知道该函数返回一个 Future 对象,并将相应地进行处理。然而,这并不意味着这个装饰器必须是定义函数的最外层的一个。例如,当与 @staticmethod@classmethod 结合使用时,@rpc.functions.async_execution 需要是内部装饰器,以允许目标函数被识别为静态或类函数。此目标函数仍然可以异步执行,因为在访问时,静态或类方法会保留由 @rpc.functions.async_execution 安装的属性。

示例:

返回的 Future 对象可以来自 rpc_async()then()Future 构造函数。下面的示例展示了直接使用 then() 返回的 Future

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

与 TorchScript 装饰器结合使用时,此装饰器必须是最外层的。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

与静态或类方法结合使用时,此装饰器必须是内部的。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

此装饰器也适用于 RRef 辅助函数,即 torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

后端#

RPC 模块可以利用不同的后端在节点之间执行通信。可以通过在 init_rpc() 函数中传递 BackendType 枚举的某个值来指定要使用的后端。无论使用哪个后端,其余的 RPC API 都不会改变。每个后端还定义了自己的 RpcBackendOptions 类子类,其实例也可以传递给 init_rpc() 来配置后端的行为。

class torch.distributed.rpc.BackendType(value)#

可用后端枚举类。

PyTorch 内置了 BackendType.TENSORPIPE 后端。可以通过 register_backend() 函数注册其他后端。

class torch.distributed.rpc.RpcBackendOptions#

封装传递给 RPC 后端的选项的抽象结构。可以将此类的一个实例传递给 init_rpc(),以使用特定配置(例如 RPC 超时和要使用的 init_method)来初始化 RPC。

property init_method#

指定如何初始化进程组的 URL。默认为 env://

property rpc_timeout#

一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间内未完成,则会引发指示超时的异常。

TensorPipe 后端#

TensorPipe 代理是默认代理,它利用 TensorPipe 库,该库提供了一种本机点对点通信原语,特别适合机器学习,并从根本上解决了 Gloo 的一些限制。与 Gloo 相比,它的优势在于异步,这使得大量传输可以同时发生,每个传输以自己的速度进行,而不会相互阻塞。它仅在需要时按需在节点对之间打开管道,并且当一个节点失败时,只有其关联的管道将被关闭,而所有其他管道将继续正常工作。此外,它能够支持多种不同的传输(TCP,当然还有共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性并协商最佳传输以用于每个管道。

TensorPipe 后端提供了基于 TCP 的传输,就像 Gloo 一样。它还能够自动分块和多路复用大张量通过多个套接字和线程,以实现非常高的带宽。代理将能够自行选择最佳传输,无需任何干预。

示例

import os
from torch.distributed import rpc
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

rpc.init_rpc(
    "worker1",
    rank=0,
    world_size=2,
    rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=8,
        rpc_timeout=20 # 20 second timeout
    )
)

# omitting init_rpc invocation on worker2
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]#

来自 RpcBackendOptionsTensorPipeAgent 的后端选项。

参数:
  • num_worker_threads (int, optional) – TensorPipeAgent 用于执行请求的线程池中的线程数(默认值:16)。

  • rpc_timeout (float, optional) – RPC 请求的默认超时时间(以秒为单位)(默认值:60 秒)。如果在该时间范围内 RPC 未完成,将引发指示超时的异常。调用者可以在 rpc_sync()rpc_async() 中为单个 RPC 覆盖此超时时间(如果需要)。

  • init_method (str, optional) – 用于初始化用于集合的分布式存储的 URL。它可以接受 init_process_group() 的相同参数接受的任何值(默认值:env://)。

  • device_maps (Dict[str, Dict], optional) – 此工作节点到被调用者的设备放置映射。键是被调用者工作节点名称,值是字典(Dict of intstrtorch.device),它将此工作节点的设备映射到被调用者工作节点的设备。(默认值:None

  • devices (List[int, str, or torch.device], optional) – RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将初始化为来自其自身 device_maps 的所有本地设备以及来自其对等方 device_maps 的相应设备。在处理 CUDA RPC 请求时,代理将正确同步此 List 中所有设备的 CUDA 流。

property device_maps#

设备映射位置。

property devices#

本地代理使用的所有设备。

property init_method#

指定如何初始化进程组的 URL。默认为 env://

property num_worker_threads#

TensorPipeAgent 用于执行请求的线程池中的线程数。

property rpc_timeout#

一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间内未完成,则会引发指示超时的异常。

set_device_map(to, device_map)[source]#

设置每个 RPC 调用者和被调用者对之间的设备映射。可以多次调用此函数以逐步添加设备放置配置。

参数:
  • to (str) – 被调用者名称。

  • device_map (Dict of int, str, or torch.device) – 此工作节点到被调用者的设备放置映射。此映射必须可逆。

示例

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>> # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices(devices)[source]#

设置 TensorPipe RPC 代理使用的本地设备。在处理 CUDA RPC 请求时,TensorPipe RPC 代理将正确同步此 List 中所有设备的 CUDA 流。

参数:

devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地设备。

注意

RPC 框架不会自动重试任何 rpc_sync()rpc_async()remote() 调用。原因是 RPC 框架无法确定操作是否是幂等的,以及是否可以安全地重试。因此,应用程序有责任处理故障并根据需要进行重试。RPC 通信基于 TCP,因此故障可能由于网络故障或间歇性网络连接问题而发生。在这种情况下,应用程序需要使用合理的退避策略进行适当的重试,以确保网络不会被激进的重试淹没。

RRef#

警告

使用 CUDA 张量时,目前不支持 RRef。

一个 RRef(远程引用)是对远程工作节点上某种类型 T(例如 Tensor)的值的引用。此句柄会使引用的远程值在所有者端保持存活,但并不意味着该值将来会传输到本地工作节点。RRefs 可用于多机训练,方法是持有对存在于其他工作节点上的 nn.Modules 的引用,并在训练期间调用适当的函数来检索或修改它们的参数。有关更多详细信息,请参阅 远程引用协议

class torch.distributed.rpc.PyRRef(RRef)#

封装远程工作节点上某个类型值的引用的类。此句柄将使引用的远程值在工作节点上保持存活。UserRRef 将在以下情况下被删除:1)在应用程序代码和本地 RRef 上下文中都没有对其的引用,或者 2)应用程序已调用正常关闭。调用已删除 RRef 上的方法会导致未定义行为。RRef 实现仅提供尽力错误检测,应用程序不应在 rpc.shutdown() 后使用 UserRRefs

警告

RRefs 只能由 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下序列化和反序列化 RRefs(例如,Python pickle、torch save() / load()、JIT save() / load() 等)将导致错误。

参数:
  • value (object) – 要由此 RRef 包装的值。

  • type_hint (Type, optional) – 要作为 value 的类型提示传递给 TorchScript 编译器的 Python 类型。

示例:

为简单起见,以下示例省略了 RPC 初始化和关闭代码。有关这些详细信息,请参阅 RPC 文档。

  1. 使用 rpc.remote 创建 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 从本地对象创建 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 与其他工作节点共享 RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: SupportsInt = -1, retain_graph: bool = False) None#

使用 RRef 作为反向传播的根来运行反向传播。如果提供了 dist_autograd_ctx_id,我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传播。在这种情况下,应使用 get_gradients() 来检索梯度。如果 dist_autograd_ctx_idNone,则假定这是一个本地自动梯度图,并且我们仅执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值应为标量张量。

参数:
  • dist_autograd_ctx_id (int, optional) – 要为其检索梯度的分布式自动梯度上下文 ID(默认值:-1)。

  • retain_graph (bool, optional) – 如果为 False,则用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 都是不必要的,并且通常可以通过更有效的方式来解决。通常,您需要将其设置为 True 才能多次运行反向传播(默认值:False)。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool#

返回此 RRef 是否已由所有者确认。OwnerRRef 始终返回 true,而 UserRRef 仅在所有者知道此 UserRRef 时才返回 true。

is_owner(self: torch._C._distributed_rpc.PyRRef) bool#

返回当前节点是否为该 RRef 的所有者。

local_value(self: torch._C._distributed_rpc.PyRRef) object#

如果当前节点是所有者,则返回本地值的引用。否则,将引发异常。

owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo#

返回拥有此 RRef 的节点的worker信息。

owner_name(self: torch._C._distributed_rpc.PyRRef) str#

返回拥有此 RRef 的节点的worker名称。

remote(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#

创建代理以方便地使用 RRef 的所有者作为目标,在 RRef 引用的对象上执行函数。更具体地说,rref.remote().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数:

timeout (float, optional) – rref.remote() 的超时时间。如果在超时时间内未成功完成此 RRef 的创建,则下次尝试使用 RRef(例如 to_here)时,将引发超时。如果未提供,则使用默认 RPC 超时。请参阅 rpc.remote() 以了解 RRef 的具体超时语义。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#

创建代理以方便地使用 RRef 的所有者作为目标,在 RRef 引用的对象上执行函数。更具体地说,rref.rpc_async().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数:

timeout (float, optional) – rref.rpc_async() 的超时时间。如果在该时间范围内调用未完成,将引发指示超时的异常。如果未提供此参数,则使用默认 RPC 超时。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#

创建代理以方便地使用 RRef 的所有者作为目标,在 RRef 引用的对象上执行函数。更具体地说,rref.rpc_sync().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数:

timeout (float, optional) – rref.rpc_sync() 的超时时间。如果在该时间范围内调用未完成,将引发指示超时的异常。如果未提供此参数,则使用默认 RPC 超时。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#

阻塞调用,将 RRef 的值从所有者复制到本地节点并返回。如果当前节点是所有者,则返回本地值的引用。

参数:

timeout (float, optional) – to_here 的超时时间。如果在该时间范围内调用未完成,将引发指示超时的异常。如果未提供此参数,则使用默认 RPC 超时(60 秒)。

RemoteModule#

警告

使用 CUDA 张量时,目前不支持 RemoteModule。

RemoteModule 是在不同进程中远程创建 nn.Module 的简便方法。实际的模块位于远程主机上,但本地主机拥有该模块的句柄,并像处理常规 nn.Module 一样调用该模块。但是,调用会产生到远程端的 RPC 调用,并且可以通过 RemoteModule 支持的附加 API 异步执行。

class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]#

RemoteModule 实例只能在 RPC 初始化后创建。

它在指定的远程节点上创建一个用户指定的模块。它的行为类似于常规的 nn.Module,只是 forward 方法在远程节点上执行。它负责自动梯度记录,以确保反向传播将梯度传播回相应的远程模块。

它会根据 module_clsforward 方法的签名生成两个方法:forward_asyncforwardforward_async 异步运行并返回一个 Future。forward_asyncforward 的参数与模块返回的 forward 方法的参数相同,该模块由 module_cls 返回。

例如,如果 module_cls 返回 nn.Linear 的实例,该实例具有签名:def forward(input: Tensor) -> Tensor:,则生成的 RemoteModule 将具有 2 个方法,签名如下:

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
参数:
  • remote_device (str) – 目标 worker 上我们希望放置此模块的设备。格式应为“<workername>/<device>”,其中 device 字段可以解析为 torch.device 类型。例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。此外,device 字段是可选的,默认值为“cpu”。

  • module_cls (nn.Module) –

    要远程创建的模块的类。例如,

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • args (Sequence, optional) – 要传递给 module_cls 的参数。

  • kwargs (Dict, optional) – 要传递给 module_cls 的关键字参数。

返回:

一个远程模块实例,它封装了用户提供的 module_cls 创建的 Module。它有一个阻塞的 forward 方法和一个异步的 forward_async 方法,该方法返回一个在远程用户提供的模块上调用 forward 的 future。

示例:

在两个不同的进程中运行以下代码

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,一个与 DistributedDataParallel (DDP) 结合的更实用的示例可以在此 教程中找到。

get_module_rref()[source]#

返回一个指向远程模块的 RRef (RRef[nn.Module])。

返回类型:

RRef[Module]

remote_parameters(recurse=True)[source]#

返回一个指向远程模块参数的 RRef 列表。

这通常可以与 DistributedOptimizer 结合使用。

参数:

recurse (bool) – 如果为 True,则返回远程模块及其所有子模块的参数。否则,仅返回直接属于远程模块的参数。

返回:

指向远程模块参数的 RRef 列表 (List[RRef[nn.Parameter]])。

返回类型:

list[RRef[Parameter]]

分布式自动微分框架#

警告

使用 CUDA 张量时,目前不支持分布式自动微分。

该模块提供了一个基于 RPC 的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。在前向传播中,我们记录梯度记录张量何时通过 RPC 发送,并在反向传播中,我们利用这些信息通过 RPC 执行分布式反向传播。有关更多详细信息,请参阅 分布式自动微分设计

torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None#

使用提供的根节点启动分布式反向传播。目前实现了 FAST 模式算法,该算法假设在不同工作节点之间同一分布式自动微分上下文中的所有 RPC 消息都将在反向传播期间成为自动微分图的一部分。

我们使用提供的根节点来发现自动微分图并计算适当的依赖关系。此方法将阻塞直到整个自动微分计算完成。

我们在每个节点上累积相应的 torch.distributed.autograd.context 中的梯度。在调用 torch.distributed.autograd.backward() 时,根据传入的 context_id 来查找要使用的自动微分上下文。如果不存在与给定 ID 对应的有效自动微分上下文,我们将抛出错误。您可以使用 get_gradients() API 检索累积的梯度。

参数:
  • context_id (int) – 用于检索梯度的自动微分上下文 ID。

  • roots (list) – 代表自动微分计算根的张量。所有张量都应该是标量。

  • retain_graph (bool, optional) – 如果为 False,则用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 都是不必要的,并且通常可以通过更有效的方式来解决。通常,您需要将其设置为 True 才能多次运行 backward。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
class torch.distributed.autograd.context[source]#

使用分布式自动微分时,用于封装前向和后向传播的上下文对象。with 语句中生成的 context_id 对于在所有工作节点上唯一标识分布式后向传播是必需的。每个工作节点都存储与此 context_id 关联的元数据,这对于正确执行分布式自动微分传递是必需的。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]#

检索一个映射,该映射将 Tensor 映射到在分布式自动微分反向传播中,作为提供的上下文(对应于给定的 context_id)的一部分累积的该 Tensor 的相应梯度。

参数:

context_id (int) – 用于检索梯度的自动微分上下文 ID。

返回:

一个映射,其中键是 Tensor,值是该 Tensor 的相关梯度。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分布式优化器#

有关分布式优化器的文档,请参阅 torch.distributed.optim 页面。

设计注意事项#

分布式自动微分设计说明涵盖了基于 RPC 的分布式自动微分框架的设计,该框架对于模型并行训练等应用非常有用。

RRef 设计说明涵盖了框架用于引用远程工作节点上值的 RRef (Remote REFerence) 协议的设计。

教程#

RPC 教程向用户介绍了 RPC 框架,提供了使用 torch.distributed.rpc API 的几个示例应用程序,并演示了如何使用 分析器来分析基于 RPC 的工作负载。