评价此页

torch.futures#

创建日期:2025 年 6 月 12 日 | 最后更新日期:2025 年 6 月 12 日

该包提供了一个 Future 类型,它封装了异步执行,以及一组简化 Future 对象操作的实用函数。目前,Future 类型主要由 Distributed RPC Framework 使用。

class torch.futures.Future(*, devices=None)#

torch._C.Future 的封装,它封装了可调用对象的异步执行,例如 rpc_async()。它还公开了一组 API,用于添加回调函数和设置结果。

警告

GPU 支持是一项 beta 功能,可能会发生变化。

add_done_callback(callback)[source]#

将给定的回调函数附加到此 Future,当 Future 完成时将运行该回调。可以向同一个 Future 添加多个回调,但不能保证其执行顺序。回调函数必须接受一个参数,即此 Future 的引用。回调函数可以使用 value() 方法获取值。请注意,如果此 Future 已经完成,则将内联运行给定的回调。

我们建议使用 then() 方法,因为它提供了一种在回调完成后进行同步的方法。add_done_callback 如果回调不返回任何内容,可能会更高效。但 then()add_done_callback 在底层使用相同的回调注册 API。

关于 GPU 张量,此方法与 then() 的行为相同。

参数

callback (Future) – 一个 Callable,它接受一个参数,即此 Future 的引用。

注意

请注意,如果回调函数抛出异常(无论是由于原始 Future 以异常完成并调用 fut.wait(),还是由于回调中的其他代码),则必须仔细处理错误。例如,如果此回调后来完成了其他 Future,那么这些 Future 不会被标记为错误完成,用户负责独立处理这些 Future 的完成/等待。

示例

>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[source]#

如果此 Future 已完成,则返回 True。如果 Future 具有结果或异常,则为完成。

如果值包含位于 GPU 上的张量,则 Future.done() 将返回 True,即使填充这些张量的异步内核尚未在设备上完成运行,因为此时结果已可用,前提是执行适当的同步(参见 wait())。

返回类型

布尔值

set_exception(result)[source]#

为此 Future 设置一个异常,这将使此 Future 标记为错误完成并触发所有已附加的回调。请注意,当调用此 Future 上的 wait()/value() 时,此处设置的异常将内联抛出。

参数

result (BaseException) – 此 Future 的异常。

示例

>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[source]#

为此 Future 设置结果,这将使此 Future 标记为完成并触发所有已附加的回调。请注意,Future 不能被标记为完成两次。

如果结果包含位于 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,也可以调用此方法,前提是该方法被调用时,那些内核所在的流被设置为当前流。简单来说,只要在启动这些内核后立即调用此方法,而不更改流,就可以安全地进行,无需任何额外的同步。此方法将在所有相关的当前流上记录事件,并使用它们来确保此 Future 的所有使用者得到正确的调度。

参数

result (object) – 此 Future 的结果对象。

示例

>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[source]#

将给定的回调函数附加到此 Future,当 Future 完成时将运行该回调。可以向同一个 Future 添加多个回调,但不能保证其执行顺序(要强制执行特定顺序,请考虑链式调用:fut.then(cb1).then(cb2))。回调函数必须接受一个参数,即此 Future 的引用。回调函数可以使用 value() 方法获取值。请注意,如果此 Future 已经完成,则将立即内联运行给定的回调。

如果 Future 的值包含位于 GPU 上的张量,那么在填充这些张量的异步内核尚未在设备上完成执行时,可能会调用该回调。但是,将使用一些专用的当前流(从全局池获取)来调用该回调,这些流将与那些内核同步。因此,回调在这些张量上执行的任何操作都将在内核完成后在设备上调度。换句话说,只要回调不切换流,它就可以安全地操作结果,而无需任何额外的同步。这与 wait() 的非阻塞行为类似。

同样,如果回调返回一个包含位于 GPU 上的张量的值,即使生成这些张量的内核仍在设备上运行,也可以这样做,只要回调在其执行期间没有更改流。如果要更改流,必须小心地将它们与原始流(即回调被调用时是当前流的那些流)重新同步。

参数

callback (Callable) – 一个 Callable,它将此 Future 作为唯一参数。

返回

一个新 Future 对象,它持有 callback 的返回值,并在给定 callback 完成时被标记为完成。

返回类型

Future[S]

注意

请注意,如果回调函数抛出异常(无论是由于原始 Future 以异常完成并调用 fut.wait(),还是由于回调中的其他代码),那么由 then 返回的 Future 将被恰当地标记为遇到错误。但是,如果此回调后来完成了其他 Future,那么这些 Future 不会被标记为错误完成,用户负责独立处理这些 Future 的完成/等待。

示例

>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[source]#

获取已完成 Future 的值。

此方法只能在调用 wait() 完成后,或在传递给 then() 的回调函数内部调用。在其他情况下,此 Future 可能尚未持有值,调用 value() 可能会失败。

如果值包含位于 GPU 上的张量,那么此方法将*不*执行任何额外的同步。这应该事先单独通过调用 wait() 来完成(回调函数内部除外,对于回调函数,then() 已经处理了)。

返回

Future 持有的值。如果创建该值的方法(回调或 RPC)抛出了错误,那么此 value() 方法也将抛出错误。

返回类型

T

wait()[source]#

阻塞直到此 Future 的值准备就绪。

如果值包含位于 GPU 上的张量,那么将与可能正在异步填充这些张量的内核(在设备上执行)进行额外的同步。这种同步是非阻塞的,这意味着 wait() 会在当前流中插入必要的指令,以确保在当前流上排队的后续操作在异步内核之后正确调度,但完成这些操作后,wait() 将返回,即使那些内核仍在运行。只要不更改流,访问和使用这些值就不需要进一步的同步。

返回

Future 持有的值。如果创建该值的方法(回调或 RPC)抛出了错误,那么此 wait 方法也将抛出错误。

返回类型

T

torch.futures.collect_all(futures)[source]#

将提供的 Future 对象收集到一个单独的合并 Future 中,当所有子 Future 都完成后,该 Future 将被标记为完成。

参数

futures (list) – 一个 Future 对象列表。

返回

返回一个 Future 对象,该对象表示传递的 Future 列表。

返回类型

Future[list[torch.jit.Future]]

示例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[source]#

等待所有提供的 Future 完成,并返回已完成值的列表。如果任何 Future 遇到错误,该方法将提前退出并报告错误,而不会等待其他 Future 完成。

参数

futures (list) – 一个 Future 对象列表。

返回

已完成 Future 结果的列表。如果对任何 Future 调用 wait 时抛出错误,则此方法将抛出错误。

返回类型

列表