torch.futures#
创建日期:2025 年 6 月 12 日 | 最后更新日期:2025 年 6 月 12 日
此包提供了一个 Future 类型,它封装了异步执行,以及一组简化 Future 对象操作的实用函数。目前,Future 类型主要由 分布式 RPC 框架 使用。
- class torch.futures.Future(*, devices=None)#
一个
torch._C.Future的包装器,它封装了一个可调用对象的异步执行,例如rpc_async()。它还公开了一组 API 来添加回调函数和设置结果。警告
GPU 支持是一项 Beta 功能,可能会发生更改。
- add_done_callback(callback)[source]#
将给定的回调函数追加到此
Future,当Future完成时,该函数将被运行。可以将多个回调添加到同一个Future,但它们的执行顺序无法保证。回调必须接受一个参数,即此Future的引用。回调函数可以使用value()方法来获取值。请注意,如果此Future已经完成,则给定的回调将内联运行。我们建议您使用
then()方法,因为它提供了一种在回调完成后进行同步的方式。如果回调不返回任何内容,add_done_callback可能更便宜。但是,then()和add_done_callback在底层使用相同的回调注册 API。对于 GPU 张量,此方法与
then()的行为方式相同。- 参数:
callback (
Future) – 一个Callable,它接受一个参数,即此Future的引用。
注意
请注意,如果回调函数抛出异常,无论是通过原始 future 以异常方式完成并调用
fut.wait(),还是通过回调中的其他代码,都必须仔细处理错误。例如,如果此回调后来完成了其他 futures,那些 futures 不会被标记为以错误方式完成,用户负责单独处理这些 futures 的完成/等待。示例
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[source]#
如果此
Future已完成,则返回True。如果Future具有结果或异常,则它已完成。如果值包含驻留在 GPU 上的张量,则
Future.done()将返回True,即使填充这些张量的异步内核尚未在设备上运行完成,因为此时结果已经可用,前提是执行了适当的同步(请参阅wait())。- 返回类型:
- set_exception(result)[source]#
为此
Future设置一个异常,这将将此Future标记为以错误方式完成并触发所有附加的回调。请注意,当在此Future上调用 wait()/value() 时,此处设置的异常将内联引发。- 参数:
result (BaseException) – 此
Future的异常。
示例
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source]#
为此
Future设置结果,这将将此Future标记为已完成并触发所有附加的回调。请注意,Future不能被标记为完成两次。如果结果包含驻留在 GPU 上的张量,则可以在填充这些张量的异步内核尚未在设备上运行完成时调用此方法,前提是当调用此方法时,在这些内核被加入的流上设置了当前流。简而言之,在启动这些内核后立即调用此方法是安全的,无需额外的同步,只要在它们之间不更改流即可。此方法将在所有相关的当前流上记录事件,并将使用它们来确保此
Future的所有使用者能够正确调度。- 参数:
result (object) – 此
Future的结果对象。
示例
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source]#
将给定的回调函数追加到此
Future,当Future完成时,该函数将被运行。可以将多个回调添加到同一个Future,但它们的执行顺序无法保证(要强制执行特定顺序,请考虑链式调用:fut.then(cb1).then(cb2))。回调必须接受一个参数,即此Future的引用。回调函数可以使用value()方法来获取值。请注意,如果此Future已经完成,则给定的回调将立即内联运行。如果
Future的值包含驻留在 GPU 上的张量,那么即使填充这些张量的异步内核尚未在设备上完成执行,也可能会调用该回调。但是,该回调将使用设置为当前流的专用流(从全局池获取)来调用,这些流将与这些内核同步。因此,回调对这些张量执行的任何操作都将在内核完成之后在设备上调度。换句话说,只要回调不切换流,它就可以安全地操作结果,而无需任何额外的同步。这类似于wait()的非阻塞行为。同样,如果回调返回的值包含驻留在 GPU 上的张量,那么即使生成这些张量的内核仍在设备上运行,它也可以这样做,只要回调在其执行期间没有切换流即可。如果要切换流,必须小心地将它们与原始流重新同步,即与调用回调时是当前流的流重新同步。
- 参数:
callback (
Callable) – 一个Callable,它将此Future作为唯一参数。- 返回:
一个新
Future对象,它持有callback的返回值,并在给定的callback完成时被标记为完成。- 返回类型:
Future[S]
注意
请注意,如果回调函数抛出异常,无论是通过原始 future 以异常方式完成并调用
fut.wait(),还是通过回调中的其他代码,由then返回的 future 都将相应地被标记为遇到的错误。但是,如果此回调后来完成了其他 futures,那些 futures 不会被标记为以错误方式完成,用户负责单独处理这些 futures 的完成/等待。示例
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- torch.futures.collect_all(futures)[source]#
将提供的
Future对象收集到一个单独的组合Future中,当所有子 futures 完成时,该 future 被标记为完成。- 参数:
- 返回:
返回一个
Future对象,该对象指向传入的 Futures 列表。- 返回类型:
- 示例:
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1