进程组#
该模块实现了可在运行时重新配置和调整大小的容错进程组。
这些扩展了标准的 PyTorch ProcessGroup API,可以在大多数接受标准进程组的地方使用。由于它们的大小可以在运行时更改,因此用户需要注意不要假设静态的 rank 或 world_size。
- class torchft.process_group.ErrorSwallowingProcessGroupWrapper(pg: ProcessGroup)[source]#
Bases:
ProcessGroupWrapper
这是任何进程组的包装器,它会吞噬错误并在出错时返回虚拟结果。
这旨在允许在训练循环外部处理错误,以避免修改建模代码以支持错误处理。
发生错误后,所有未来的操作都将被跳过,直到进程组通过
configure
重新配置。- allreduce(tensors: List[Tensor], opts: object) Work [source]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- class torchft.process_group.FakeProcessGroupWrapper(pg: ProcessGroup)[source]#
Bases:
ProcessGroupWrapper
这是任何进程组的包装器,可用于在各个点将错误注入进程组。
这旨在用于测试,以便它们可以测试进程组操作出错的情况。
- allreduce(tensors: List[Tensor], opts: object) Work [source]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- class torchft.process_group.ManagedProcessGroup(manager: Manager)[source]#
Bases:
ProcessGroupWrapper
这是由 torchft Manager 管理的任何进程组的包装器。
它使用 Manager 中配置的进程组。world_size 是动态的,并将活跃参与者的数量报告给模型。
任何错误都会被异步报告给 Manager,只有成功的结果才会返回给调用者。
- class torchft.process_group.ProcessGroup(*args: object, **kwargs: object)[source]#
Bases:
ProcessGroup
- allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work [source]#
将整个组的张量收集到一个列表中。
有关更多详细信息,请参阅 torch.distributed.all_gather。
- allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work [source]#
对合并后的张量执行 allgather 操作。
有关更多详细信息,请参阅 torch.distributed.allgather_coalesced。
- allreduce(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work [source]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- allreduce_coalesced(tensors: List[Tensor], opts: AllreduceCoalescedOptions) Work [source]#
以合并的方式执行 all_reduce 操作。
有关更多详细信息,请参阅 torch.distributed.all_reduce_coalesced。
- alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work [source]#
执行 all_to_all 操作。
有关更多详细信息,请参阅 torch.distributed.all_to_all_single。
- broadcast(tensor_list: List[Tensor], opts: BroadcastOptions) Work [source]#
将张量广播到整个组。
有关更多详细信息,请参阅 torch.distributed.broadcast。
- configure(store_addr: str, rank: int, world_size: int) None [source]#
此函数将进程组重新配置为使用新的 store、rank 和 world_size。
每次调用此函数时,都必须提供一个唯一的带前缀的 store 地址。例如:localhost:1234/my/prefix/1
此函数将阻塞直到底层进程组被创建。如果发生错误,此函数将抛出异常。
- 参数
store_addr – 要使用的 store 的地址
rank – 此进程的 rank
world_size – 此进程组的 world_size
- recv(tensors: List[Tensor], src_rank: int, tag: int) Work [source]#
从 rank 处的进程接收张量列表。
有关更多详细信息,请参阅 torch.distributed.recv。
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) Work [source]#
对张量列表进行归约,然后将它们分散到组中的所有进程。
有关更多详细信息,请参阅 torch.distributed.reduce_scatter。
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work [source]#
对合并的张量执行reduce-scatter操作。
更多细节请参阅 torch.distributed.reduce_scatter_tensor。
- register(name: str) ProcessGroup [source]#
将进程组注册到全局注册表中。这使得它可以与功能性集合等可编译的内容一起使用。
此操作应只调用一次。
- 参数
name – name 必须是此进程组的唯一名称
- class torchft.process_group.ProcessGroupBaby(timeout: Union[float, timedelta] = 60.0)[source]#
Bases:
ProcessGroup
这是一个在子进程中运行底层进程组的进程组。由于它在子进程中运行,所有张量都需要位于共享内存中,或者会被移动到共享内存。CUDA 张量是隐式可共享的,不需要任何更改。
- allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work [source]#
将整个组的张量收集到一个列表中。
有关更多详细信息,请参阅 torch.distributed.all_gather。
- allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work [source]#
对合并后的张量执行 allgather 操作。
有关更多详细信息,请参阅 torch.distributed.allgather_coalesced。
- allreduce(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work [source]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceCoalescedOptions, ReduceOp]) Work [source]#
以合并的方式执行 all_reduce 操作。
有关更多详细信息,请参阅 torch.distributed.all_reduce_coalesced。
- alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work [source]#
执行 all_to_all 操作。
有关更多详细信息,请参阅 torch.distributed.all_to_all_single。
- barrier(opts: Optional[BarrierOptions] = None) Work [source]#
同步所有进程。
有关更多详细信息,请参阅 torch.distributed.barrier。
- broadcast(tensor_list: List[Tensor], opts: BroadcastOptions) Work [source]#
将张量广播到整个组。
有关更多详细信息,请参阅 torch.distributed.broadcast。
- configure(store_addr: str, rank: int, world_size: int) None [source]#
此函数将进程组重新配置为使用新的 store、rank 和 world_size。
每次调用此函数时,都必须提供一个唯一的带前缀的 store 地址。例如:localhost:1234/my/prefix/1
此函数将阻塞直到底层进程组被创建。如果发生错误,此函数将抛出异常。
- 参数
store_addr – 要使用的 store 的地址
rank – 此进程的 rank
world_size – 此进程组的 world_size
- recv(tensors: List[Tensor], src_rank: int, tag: int) Work [source]#
从 rank 处的进程接收张量列表。
有关更多详细信息,请参阅 torch.distributed.recv。
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) Work [source]#
对张量列表进行归约,然后将它们分散到组中的所有进程。
有关更多详细信息,请参阅 torch.distributed.reduce_scatter。
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work [source]#
对合并的张量执行reduce-scatter操作。
更多细节请参阅 torch.distributed.reduce_scatter_tensor。
- class torchft.process_group.ProcessGroupBabyGloo(timeout: Union[float, timedelta] = 60.0)[source]#
Bases:
ProcessGroupBaby
这是一个在子进程中运行 Gloo 的进程组。
对于大多数用例,您应该优先选择 ProcessGroupGloo 或 ProcessGroupBabyNCCL。
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) None [source]#
此函数是 ProcessGroupGloo 类中 reduce_scatter 操作的占位符。然而,Gloo 后端不支持此操作,因此调用此函数将引发 RuntimeError。
- 引发
RuntimeError – 总是引发,因为 reduce_scatter 不
ProcessGroupGloo 支持。 –
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) None [source]#
此函数是 ProcessGroupBabyGloo 类中 reduce_scatter_tensor_coalesced 操作的占位符。然而,Gloo 后端不支持此操作,因此调用此函数将引发 RuntimeError。
- 引发
RuntimeError – 总是引发,因为 reduce_scatter 不
ProcessGroupBabyGloo 支持。 –
- class torchft.process_group.ProcessGroupBabyNCCL(timeout: Union[float, timedelta] = 60.0)[source]#
Bases:
ProcessGroupBaby
这是一个在子进程中运行 NCCL 的进程组。
对于 NCCL 后端,子进程的 CUDA 上下文将比在主进程中运行 NCCL 使用额外的内存。这通常约为 1GB。
返回的 Work 对象仅在 CUDA 流上同步,而在 CPU 端不同步。这是通过在进程之间传递 CUDA Events 来实现的。要执行 CPU 同步,请在调用 wait() 后调用 torch.cuda.synchronize()。
警告:如果子进程在操作运行时被杀死,根据当前 PyTorch 实现,CUDA 张量可能会发生泄漏。TODO 修复
警告:由于这为子进程使用了单独的 CUDA 上下文,因此性能可能比直接使用 NCCL 慢。单独的 CUDA 上下文不能同时运行,因此网络和计算内核不会重叠执行,而是进行时间分片,这可能会降低 GPU 利用率。
- class torchft.process_group.ProcessGroupDummy(rank: int, world: int)[source]#
Bases:
ProcessGroup
此进程组会丢弃所有传递给它的数据并返回成功。这适用于我们想要丢弃某些操作而不修改底层库的罕见情况。
此 PG 只支持 world_size 为 1。
- allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: object) Work [source]#
将整个组的张量收集到一个列表中。
有关更多详细信息,请参阅 torch.distributed.all_gather。
- allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work [source]#
对合并后的张量执行 allgather 操作。
有关更多详细信息,请参阅 torch.distributed.allgather_coalesced。
- allreduce(tensors: List[Tensor], opts: object) Work [source]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work [source]#
以合并的方式执行 all_reduce 操作。
有关更多详细信息,请参阅 torch.distributed.all_reduce_coalesced。
- alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work [source]#
执行 all_to_all 操作。
有关更多详细信息,请参阅 torch.distributed.all_to_all_single。
- barrier(opts: Optional[BarrierOptions] = None) Work [source]#
同步所有进程。
有关更多详细信息,请参阅 torch.distributed.barrier。
- broadcast(tensor_list: List[Tensor], opts: object) Work [source]#
将张量广播到整个组。
有关更多详细信息,请参阅 torch.distributed.broadcast。
- configure(store_addr: str, rank: int, world_size: int) None [source]#
此函数将进程组重新配置为使用新的 store、rank 和 world_size。
每次调用此函数时,都必须提供一个唯一的带前缀的 store 地址。例如:localhost:1234/my/prefix/1
此函数将阻塞直到底层进程组被创建。如果发生错误,此函数将抛出异常。
- 参数
store_addr – 要使用的 store 的地址
rank – 此进程的 rank
world_size – 此进程组的 world_size
- recv(tensors: List[Tensor], src_rank: int, tag: int) Work [源]#
从 rank 处的进程接收张量列表。
有关更多详细信息,请参阅 torch.distributed.recv。
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: object) Work [源]#
对张量列表进行归约,然后将它们分散到组中的所有进程。
有关更多详细信息,请参阅 torch.distributed.reduce_scatter。
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work [源]#
对合并的张量执行reduce-scatter操作。
更多细节请参阅 torch.distributed.reduce_scatter_tensor。
- class torchft.process_group.ProcessGroupGloo(timeout: timedelta = datetime.timedelta(seconds=60), pg: Optional[ProcessGroup] = None)[源]#
Bases:
ProcessGroupWrapper
This is a reconfigurable version of ProcessGroupGloo.
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) None [源]#
此函数是 ProcessGroupGloo 类中 reduce_scatter 操作的占位符。然而,Gloo 后端不支持此操作,因此调用此函数将引发 RuntimeError。
- 引发
RuntimeError – 总是引发,因为 reduce_scatter 不
ProcessGroupGloo 支持。 –
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) None [源]#
This function is a placeholder for the reduce_scatter_tensor_coalesced operation in the ProcessGroupGloo class. However, this operation is not supported by the Gloo backend, and thus, calling this function will raise a RuntimeError.
- 引发
RuntimeError – 总是引发,因为 reduce_scatter 不
ProcessGroupGloo 支持。 –
- class torchft.process_group.ProcessGroupNCCL(timeout: timedelta = datetime.timedelta(seconds=60))[源]#
Bases:
ProcessGroupWrapper
This is a reconfigurable version of ProcessGroupNCCL.
If you are using a supported version of NCCL (NCCL >= 2.26, torch >= 2.7) this will attempt to use ncclCommAbort to recover from any timeouts.
This uses a Python user space event loop to asynchronously wait for the NCCL operations to complete. This should not be used with very long timeouts as the timeout entries are not cleaned up until the elapsed duration completes which may result in slowness or excess memory usage.
WARNING: this may result in deadlocks due to NCCL error handling and on old versions of torch/NCCL will result in deadlocks.
- 参数
timeout – the timeout to use for NCCL operations.
- class torchft.process_group.ProcessGroupWrapper(timeout: timedelta = datetime.timedelta(seconds=60), pg: Optional[ProcessGroup] = None)[源]#
Bases:
ProcessGroup
This is a wrapper around any ProcessGroup with a reconfiguration method.
- 参数
timeout – timeout for reconfiguration for TCPStore
pg – optional ProcessGroup to use, if None a new one will be created
- allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work [源]#
将整个组的张量收集到一个列表中。
有关更多详细信息,请参阅 torch.distributed.all_gather。
- allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work [源]#
对合并后的张量执行 allgather 操作。
有关更多详细信息,请参阅 torch.distributed.allgather_coalesced。
- allreduce(tensors: List[Tensor], opts: object) Work [源]#
跨所有机器的张量数据进行归约,以便所有机器都能获得最终结果。
有关更多详细信息,请参阅 torch.distributed.all_reduce。
- allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work [源]#
以合并的方式执行 all_reduce 操作。
有关更多详细信息,请参阅 torch.distributed.all_reduce_coalesced。
- alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work [源]#
执行 all_to_all 操作。
有关更多详细信息,请参阅 torch.distributed.all_to_all_single。
- barrier(opts: Optional[BarrierOptions] = None) Work [源]#
同步所有进程。
有关更多详细信息,请参阅 torch.distributed.barrier。
- broadcast(tensor_list: List[Tensor], opts: object) Work [源]#
将张量广播到整个组。
有关更多详细信息,请参阅 torch.distributed.broadcast。
- configure(store_addr: str, rank: int, world_size: int) None [源]#
此函数将进程组重新配置为使用新的 store、rank 和 world_size。
每次调用此函数时,都必须提供一个唯一的带前缀的 store 地址。例如:localhost:1234/my/prefix/1
此函数将阻塞直到底层进程组被创建。如果发生错误,此函数将抛出异常。
- 参数
store_addr – 要使用的 store 的地址
rank – 此进程的 rank
world_size – 此进程组的 world_size
- recv(tensors: List[Tensor], src_rank: int, tag: int) Work [源]#
从 rank 处的进程接收张量列表。
有关更多详细信息,请参阅 torch.distributed.recv。
- reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: object) Work [源]#
对张量列表进行归约,然后将它们分散到组中的所有进程。
有关更多详细信息,请参阅 torch.distributed.reduce_scatter。
- reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work [源]#
对合并的张量执行reduce-scatter操作。
更多细节请参阅 torch.distributed.reduce_scatter_tensor。