快捷方式

规划器

TorchRec 规划器负责确定分布式训练和推理的最优性能、均衡的分片方案。

生成分片方案的主要 API 是 EmbeddingShardingPlanner.plan

class torchrec.distributed.types.ShardingPlan(plan: Dict[str, ModuleShardingPlan])

分片方案表示。它使用更大的包装模型的 FQN(即使用 DistributedModelParallel 包装的模型)。如果希望 TorchRec 具有可组合性,应使用 EmbeddingModuleShardingPlan。

plan

以模块路径为键的字典,其值是以参数名称为键的参数分片规范的字典。

类型:

Dict[str, EmbeddingModuleShardingPlan]

get_plan_for_module(module_path: str) Optional[ModuleShardingPlan]
参数:

module_path (str) –

返回:

以模块路径为键的参数分片规范字典,按参数名称索引。如果给定 module_path 没有分片规范,则为 None。

返回类型:

Optional[ModuleShardingPlan]

class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True, callbacks: Optional[List[Callable[[List[ShardingOption]], List[ShardingOption]]]] = None, timeout_seconds: Optional[int] = None)

为给定的模块和可分片参数提供一个优化的分片方案,根据提供的分片器、拓扑和约束。

参数:
  • topology (Optional[Topology]) – 当前进程组的拓扑结构。

  • batch_size (Optional[int]) – 模型的批次大小。

  • enumerator (Optional[Enumerator]) – 要使用的枚举器

  • storage_reservation (Optional[StorageReservation]) – 要使用的存储预留

  • proposer (Optional[Union[Proposer, List[Proposer]]]) – 要使用的提议者

  • partitioner (Optional[Partitioner]) – 要使用的分区器

  • performance_model (Optional[PerfModel]) – 要使用的性能模型

  • stats (Optional[Union[Stats, List[Stats]]]) – 要使用的统计数据

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 每个表的参数约束。

  • debug (bool) – 是否打印调试信息。

示例

ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta"))
planner = EmbeddingShardingPlanner()
plan = planner.plan(
    module=ebc,
    sharders=[EmbeddingBagCollectionSharder()],
)
collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan

在 rank 0 上调用 self.plan(...) 并进行广播

参数:
  • module (nn.Module) – 要分片的模块。

  • sharders (Optional[List[ModuleSharder[nn.Module]]]) – 用于分片的切分器

  • pg (Optional[dist.ProcessGroup]) – 用于集体操作的进程组

返回:

模块的分片方案。

返回类型:

ShardingPlan

hash_planner_context_inputs() int

为除 partitioner、proposer、performance model 和 stats 之外的所有规划器输入生成哈希。这些是验证先前生成的 sharding plan 在新上下文中是否仍然有效所需的所有输入。

返回:

生成捕获拓扑、批次大小、枚举器、存储预留、统计数据和约束的哈希。

plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan

为给定的模块和可分片参数提供一个优化的分片方案,根据提供的分片器、拓扑和约束。

参数:
  • module (nn.Module) – 要分片的模块。

  • sharders (List[ModuleSharder[nn.Module]]) – 用于分片的切分器。

返回:

模块的分片方案。

返回类型:

ShardingPlan

class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)

根据用户提供的约束,为给定的 nn.Module 生成 embedding 分片选项。

参数:
  • topology (Topology) – 设备拓扑。

  • batch_size (int) – 批次大小。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 参数名称到提供的 ParameterConstraints 的字典。

  • estimator (Optional[Union[ShardEstimator, List[ShardEstimator]]]) – 分片性能估计器。

  • use_exact_enumerate_order (bool) – 是否按照确切的 name_children 枚举顺序来枚举可分片参数

enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption]

生成与给定模块和分片器相关的分片选项。

参数:
  • module (nn.Module) – 要分片的模块。

  • sharders (List[ModuleSharder[nn.Module]]) – 为模块提供的分片器。

返回:

填充了值的有效分片选项。

返回类型:

List[ShardingOption]

populate_estimates(sharding_options: List[ShardingOption]) None

见类描述。

class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)

贪婪分区器。

参数:
  • sort_by (SortBy) – 按存储或性能降序对分片选项进行排序(即,先放置较大的表)。

  • balance_modules (bool) – 是否首先按模块排序,将较小的模块排在前面。实际上,这将以平衡的方式将每个模块中的表进行排序。

partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption]

根据每个分片选项的 partition_by 属性将分片选项放置在拓扑上。在放置结束时会更新拓扑、存储和性能。

参数:
  • proposal (List[ShardingOption]) – 已填充的分片选项列表。

  • storage_constraint (Topology) – 设备拓扑。

返回:

所选计划的分片选项列表。

返回类型:

List[ShardingOption]

示例

sharding_options = [
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=1, perf=1),
                    Shards(storage=1, perf=1),
                ]),
        ShardingOption(partition_by="uniform",
                shards=[
                    Shards(storage=2, perf=2),
                    Shards(storage=2, perf=2),
                ]),
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=3, perf=3),
                    Shards(storage=3, perf=3),
                ])
        ShardingOption(partition_by="device",
                shards=[
                    Shards(storage=4, perf=4),
                    Shards(storage=4, perf=4),
                ]),
    ]
topology = Topology(world_size=2)

# First [sharding_options[0] and sharding_options[1]] will be placed on the
# topology with the uniform strategy, resulting in

topology.devices[0].perf.total = (1,2)
topology.devices[1].perf.total = (1,2)

# Finally sharding_options[2] and sharding_options[3]] will be placed on the
# topology with the device strategy (see docstring of `partition_by_device` for
# more details).

topology.devices[0].perf.total = (1,2) + (3,4)
topology.devices[1].perf.total = (1,2) + (3,4)

# The topology updates are done after the end of all the placements (the other
# in the example is just for clarity).
class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)

使用启发式计算预留模型以进行分片。存储预留包括密集张量存储、KJT 存储以及额外百分比的总存储。

参数:
  • percentage (float) – 额外的存储百分比,作为启发式存储计算之外的误差裕度。

  • parameter_multiplier (float) – 总参数存储的启发式乘数。

  • dense_tensor_estimate (Optional[int]) – 密集张量的存储估计,如果未提供,则使用默认的启发式估计。

property last_reserved_topology: Optional[Topology]

上一个 reserve() 方法输出的缓存值。

class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)

以贪婪方式提出分片方案。

按性能对每个可分片参数的分片选项进行排序。在每次迭代中,找到当前存储使用量最大的参数,并尝试其下一个分片选项。

参数:
  • use_depth (bool) – 启用时,fqn 的 sharding_options 根据 max(shard.perf.total) 排序,否则 sharding_options 根据 sum(shard.perf.total) 排序。

  • threshold (Optional[int]) – 提前停止的阈值。当指定时,提议者会在提议的连续性能评级差于最佳性能评级时停止提议。

feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None

向提议者提供反馈。

参数:
  • partitionable (bool) – 计划是否可分区。

  • plan (Optional[List[ShardingOption]]) – 要提供反馈的计划。

  • perf_rating (Optional[float]) – 计划的性能评级。

  • storage_constraint (Optional[Topology]) – 计划的存储约束。

load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None

将搜索空间加载到提议者中。

参数:
  • search_space (List[ShardingOption]) – 要加载的搜索空间。

  • enumerator (Enumerator) – 用于生成搜索空间的枚举器。

propose() Optional[List[ShardingOption]]

提出一个分片方案。

返回:

建议的分片方案。

返回类型:

Optional[List[ShardingOption]]

class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)

Embedding 墙时性能估计器。此估计器估计给定分片选项的墙时。

参数:
  • topology (Topology) – 设备拓扑。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 参数约束。

  • is_inference (bool) – 估计器是用于推理还是其他。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估计给定分片选项的墙时。

参数:
  • sharding_options (List[ShardingOption]) – 分片选项列表。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – 从模块类型到分片器的映射。

classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, comms_bandwidths: GeneralizedCommsBandwidth, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf]

尝试将性能建模为相对墙时的时间函数。

参数:
  • shard_sizes (List[List[int]]) – 每个分片的(local_rows, local_cols)列表。

  • compute_kernel (str) – 计算内核。

  • compute_device (str) – 计算设备。

  • sharding_type (str) – tw, rw, cw, twrw, dp。

  • batch_sizes (List[int]) – 每个输入特征的批次大小。

  • world_size (int) – 所有主机的设备数量。

  • local_world_size (int) – 每个主机的设备数量。

  • input_lengths (List[float]) – 每个输入查询特征的平均查找次数列表。

  • input_data_type_size (float) – 分布式数据并行输入的类型大小。

  • table_data_type_size (float) – 表的类型大小。

  • output_data_type_size (float) – 输出嵌入的类型大小。

  • fwd_comm_data_type_size (float) – 前向通信期间分布式数据并行输入的类型大小。

  • bwd_comm_data_type_size (float) – 后向通信期间分布式数据并行输入的类型大小。

  • num_poolings (List[float]) – 每个样本的池化数量,通常为 1.0。

  • hbm_mem_bw (float) – 设备 HBM 的带宽。

  • ddr_mem_bw (float) – 系统 DDR 内存的带宽。

  • hbm_to_ddr_bw (float) – 设备 HBM 和系统 DDR 之间的带宽。

  • intra_host_bw (float) – 单个主机内的带宽,例如多个线程。

  • inter_host_bw (float) – 两个主机之间的带宽,例如多台机器。

  • is_pooled (bool) – 如果 embedding 输出被池化(例如 EmbeddingBag),则为 True;如果未池化/顺序(例如 Embedding),则为 False。

  • is_weighted (bool = False) – 如果模块是 EBC 且带权重,通常表示 id 分数列表特征。

  • is_inference (bool = False) – 如果是推理模型。

  • caching_ratio (Optional[float] = None) – 缓存比率,用于确定设备的带宽。

  • prefetch_pipeline (bool = False) – 是否启用了预取流水线。

  • expected_cache_fetches (float) – 全局批次中预期的缓存获取次数

  • uneven_sharding_perf_multiplier (float = 1.0) – 用于考虑不均匀分片性能的乘数

返回:

每个分片的性能列表。

返回类型:

List[float]

class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, run_embedding_at_peak_memory: bool = False, is_inference: bool = False)

Embedding 存储使用量估计器

参数:
  • topology (Topology) – 设备拓扑。

  • constraints (Optional[Dict[str, ParameterConstraints]]) – 参数约束。

  • pipeline_type (PipelineType) – 流水线类型(如果有)。它将决定内存估计期间的输入复制因子。

  • run_embedding_at_peak_memory (bool) –

    如果 embedding 的前向/后向计算将在 HBM 使用量达到峰值时执行。当设置为 True 时,任何在 embedding 前向/后向计算期间的临时内存分配(只要输出大小在 output_dist 之前)都将被计入 HBM 存储成本。否则,它们将不会被计入,因为它们将被“隐藏”在实际内存峰值中。

    仅当 pipeline_type 设置用于向后兼容时才有效(不影响使用旧的与流水线无关的公式的模型)

    默认为 False,因为对于 RecSys 来说这通常是 False,因为内存峰值发生在密集前向计算结束时/密集后向计算开始时。

  • is_inference (bool) – 模型是否是推理模型。默认为 False。

estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None

估计每个分片选项的存储成本。

参数:
  • sharding_options (List[ShardingOption]) – 分片选项列表。

  • sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – 从模块类型到分片器的映射。

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源