实验性面向对象分布式 API#
创建日期:2025 年 7 月 9 日 | 最后更新日期:2025 年 7 月 30 日
这是 PyTorch Distributed 的一个实验性新 API。它目前正在积极开发中,并且可能会发生更改或完全删除。
此 API 旨在作为更灵活、面向对象的分布式 API 的试验场。
- class torch.distributed._dist2.ProcessGroup#
Bases:
pybind11_object
ProcessGroup 是一个通信基元,允许跨进程组进行集体操作。
这是一个基类,提供了所有 ProcessGroups 的接口。它不打算直接使用,而是由子类扩展。
- class BackendType#
Bases:
pybind11_object
用于进程组的后端类型。
成员
UNDEFINED
GLOO
NCCL
XCCL
UCC
MPI
CUSTOM
- CUSTOM = <BackendType.CUSTOM: 6>#
- GLOO = <BackendType.GLOO: 1>#
- MPI = <BackendType.MPI: 4>#
- NCCL = <BackendType.NCCL: 2>#
- UCC = <BackendType.UCC: 3>#
- UNDEFINED = <BackendType.UNDEFINED: 0>#
- XCCL = <BackendType.XCCL: 5>#
- property name#
- property value#
- CUSTOM = <BackendType.CUSTOM: 6>#
- GLOO = <BackendType.GLOO: 1>#
- MPI = <BackendType.MPI: 4>#
- NCCL = <BackendType.NCCL: 2>#
- UCC = <BackendType.UCC: 3>#
- UNDEFINED = <BackendType.UNDEFINED: 0>#
- XCCL = <BackendType.XCCL: 5>#
- allgather(*args, **kwargs)#
重载函数。
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0efcc70>) -> c10d::Work
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.all_gather()
。allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensor: torch.Tensor, timeout: datetime.timedelta | None = None) -> c10d::Work
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.all_gather()
。
- allgather_coalesced(self: torch._C._distributed_c10d.ProcessGroup, output_lists: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_list: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0aa5eb0>) c10d::Work #
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.all_gather()
。
- allgather_into_tensor_coalesced(self: torch._C._distributed_c10d.ProcessGroup, outputs: collections.abc.Sequence[torch.Tensor], inputs: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0efae70>) c10d::Work #
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.all_gather()
。
- allreduce(*args, **kwargs)#
重载函数。
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f5ce0a9c2f0>) -> c10d::Work
在进程组中的所有进程上对提供的张量执行 allreduce。
有关更多详细信息,请参阅
torch.distributed.all_reduce()
。allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work
在进程组中的所有进程上对提供的张量执行 allreduce。
有关更多详细信息,请参阅
torch.distributed.all_reduce()
。allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work
在进程组中的所有进程上对提供的张量执行 allreduce。
有关更多详细信息,请参阅
torch.distributed.all_reduce()
。
- allreduce_coalesced(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceCoalescedOptions = <torch._C._distributed_c10d.AllreduceCoalescedOptions object at 0x7f5ce0efb9f0>) c10d::Work #
在进程组中的所有进程上对提供的张量执行 allreduce。
有关更多详细信息,请参阅
torch.distributed.all_reduce()
。
- alltoall(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllToAllOptions = <torch._C._distributed_c10d.AllToAllOptions object at 0x7f5ce0aa6a70>) c10d::Work #
在进程组中的所有进程之间 alltoall 输入张量。
有关更多详细信息,请参阅
torch.distributed.all_to_all()
。
- alltoall_base(*args, **kwargs)#
重载函数。
alltoall_base(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: torch.Tensor, output_split_sizes: collections.abc.Sequence[typing.SupportsInt], input_split_sizes: collections.abc.Sequence[typing.SupportsInt], opts: torch._C._distributed_c10d.AllToAllOptions = <torch._C._distributed_c10d.AllToAllOptions object at 0x7f5ce0f870b0>) -> c10d::Work
在进程组中的所有进程之间 alltoall 输入张量。
有关更多详细信息,请参阅
torch.distributed.all_to_all()
。alltoall_base(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: torch.Tensor, output_split_sizes: collections.abc.Sequence[typing.SupportsInt], input_split_sizes: collections.abc.Sequence[typing.SupportsInt], timeout: datetime.timedelta | None = None) -> c10d::Work
在进程组中的所有进程之间 alltoall 输入张量。
有关更多详细信息,请参阅
torch.distributed.all_to_all()
。
- barrier(*args, **kwargs)#
重载函数。
barrier(self: torch._C._distributed_c10d.ProcessGroup, opts: torch._C._distributed_c10d.BarrierOptions = <torch._C._distributed_c10d.BarrierOptions object at 0x7f5ce0aa6f30>) -> c10d::Work
- 阻塞,直到组中的所有进程都调用此方法,然后
一起退出调用。
有关更多详细信息,请参阅
torch.distributed.barrier()
。
barrier(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta | None = None) -> c10d::Work
- 阻塞,直到组中的所有进程都调用此方法,然后
一起退出调用。
有关更多详细信息,请参阅
torch.distributed.barrier()
。
- property bound_device_id#
- broadcast(*args, **kwargs)#
重载函数。
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f5ce0ef0bb0>) -> c10d::Work
将张量广播到进程组中的所有进程。
有关更多详细信息,请参阅
torch.distributed.broadcast()
。broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work
将张量广播到进程组中的所有进程。
有关更多详细信息,请参阅
torch.distributed.broadcast()
。
- gather(*args, **kwargs)#
重载函数。
gather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.GatherOptions = <torch._C._distributed_c10d.GatherOptions object at 0x7f5ce12f27f0>) -> c10d::Work
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.gather()
。gather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensor: torch.Tensor, root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work
从进程组中的所有进程收集输入张量。
有关更多详细信息,请参阅
torch.distributed.gather()
。
- get_group_store(self: torch._C._distributed_c10d.ProcessGroup) torch._C._distributed_c10d.Store #
获取此进程组的存储。
- property group_desc#
获取此进程组描述
- property group_name#
(获取此进程组名称。它是集群唯一的)
- merge_remote_group(self: torch._C._distributed_c10d.ProcessGroup, store: torch._C._distributed_c10d.Store, size: SupportsInt, timeout: datetime.timedelta = datetime.timedelta(seconds=1800), group_name: str | None = None, group_desc: str | None = None) torch._C._distributed_c10d.ProcessGroup #
- monitored_barrier(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta | None = None, wait_all_ranks: bool = False) None #
- 阻塞,直到组中的所有进程都调用此方法,然后
一起退出调用。
有关更多详细信息,请参阅
torch.distributed.monitored_barrier()
。
- recv(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], srcRank: SupportsInt, tag: SupportsInt) c10d::Work #
从指定 rank 接收张量。
有关更多详细信息,请参阅
torch.distributed.recv()
。
- recv_anysource(self: torch._C._distributed_c10d.ProcessGroup, arg0: collections.abc.Sequence[torch.Tensor], arg1: SupportsInt) c10d::Work #
从任何源接收张量。
有关更多详细信息,请参阅
torch.distributed.recv()
。
- reduce(*args, **kwargs)#
重载函数。
reduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.ReduceOptions = <torch._C._distributed_c10d.ReduceOptions object at 0x7f5ce0eda330>) -> c10d::Work
在进程组中的所有进程上对提供的张量执行 reduce。
有关更多详细信息,请参阅
torch.distributed.reduce()
。reduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: typing.SupportsInt, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work
在进程组中的所有进程上对提供的张量执行 reduce。
有关更多详细信息,请参阅
torch.distributed.reduce()
。
- reduce_scatter(*args, **kwargs)#
重载函数。
reduce_scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], opts: torch._C._distributed_c10d.ReduceScatterOptions = <torch._C._distributed_c10d.ReduceScatterOptions object at 0x7f5ce0ef2e30>) -> c10d::Work
从进程组中的所有进程对输入张量进行 reduce 并 scatter。
有关更多详细信息,请参阅
torch.distributed.reduce_scatter()
。reduce_scatter(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: collections.abc.Sequence[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work
从进程组中的所有进程对输入张量进行 reduce 并 scatter。
有关更多详细信息,请参阅
torch.distributed.reduce_scatter()
。
- reduce_scatter_tensor_coalesced(self: torch._C._distributed_c10d.ProcessGroup, outputs: collections.abc.Sequence[torch.Tensor], inputs: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.ReduceScatterOptions = <torch._C._distributed_c10d.ReduceScatterOptions object at 0x7f5ce0efe830>) c10d::Work #
从进程组中的所有进程对输入张量进行 reduce 并 scatter。
有关更多详细信息,请参阅
torch.distributed.reduce_scatter()
。
- scatter(*args, **kwargs)#
重载函数。
scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], opts: torch._C._distributed_c10d.ScatterOptions = <torch._C._distributed_c10d.ScatterOptions object at 0x7f5ce0a9dff0>) -> c10d::Work
将输入张量从进程组中的所有进程 scatter。
有关更多详细信息,请参阅
torch.distributed.scatter()
。scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensor: torch.Tensor, input_tensors: collections.abc.Sequence[torch.Tensor], root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work
将输入张量从进程组中的所有进程 scatter。
有关更多详细信息,请参阅
torch.distributed.scatter()
。
- send(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], dstRank: SupportsInt, tag: SupportsInt) c10d::Work #
将张量发送到指定 rank。
有关更多详细信息,请参阅
torch.distributed.send()
。
- set_timeout(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta) None #
设置所有未来操作的默认超时时间。
- split_group(self: torch._C._distributed_c10d.ProcessGroup, ranks: collections.abc.Sequence[typing.SupportsInt], timeout: datetime.timedelta | None = None, opts: c10d::Backend::Options | None = None, group_name: str | None = None, group_desc: str | None = None) torch._C._distributed_c10d.ProcessGroup #
- class torch.distributed._dist2.ProcessGroupFactory(*args, **kwargs)[source]#
Bases:
Protocol
进程组工厂的协议。
- torch.distributed._dist2.new_group(backend, timeout, device, **kwargs)[source]#
使用给定的后端和选项创建一个新的进程组。此组是独立的,不会被全局注册,因此不能通过标准的 torch.distributed.* API 使用。
- torch.distributed._dist2.process_group(pg)[source]#
进程组的上下文管理器。线程本地方法。
- 参数
pg (ProcessGroup) – 要使用的进程组。
- 返回类型
Generator[None, None, None]
- torch.distributed._dist2.register_backend(name, func)[source]#
注册新的进程组后端。
- 参数
name (str) – 后端的名称。
func (ProcessGroupFactory) – 用于创建进程组的函数。