Pipeline 并行#
创建于: 2025 年 6 月 16 日 | 最后更新于: 2025 年 8 月 13 日
注意
torch.distributed.pipelining
目前处于 alpha 状态,正在开发中。API 可能会发生更改。它从 PiPPy 项目迁移而来。
为何选择 Pipeline 并行?#
Pipeline 并行是深度学习中一种 **基础** 的并行方式。它允许模型 **执行** 被分割,使得多个 **微批次 (micro-batches)** 可以并发执行模型的不同部分。Pipeline 并行可以是有效的技术,适用于
大规模训练
带宽受限的集群
大型模型推理
上述场景的共同点是,每个设备的计算量无法掩盖传统并行方式的通信开销,例如 FSDP 的权重 all-gather。
什么是 torch.distributed.pipelining
?#
虽然 Pipelining 在扩展性方面前景广阔,但实现起来通常很困难,因为它除了需要分割模型权重之外,还需要 **分割模型的执行**。执行的分割通常需要对模型进行侵入式代码更改。另一个复杂性来自 **在分布式环境中调度微批次**,并考虑 **数据流依赖**。
名为 pipelining
的包提供了一个工具包,可以 **自动** 完成上述任务,从而能够轻松地在 **通用** 模型上实现 Pipeline 并行。
它由两部分组成:一个 **分割前端** 和一个 **分布式运行时**。分割前端会原封不动地接收你的模型代码,将其分割成“模型分区 (model partitions)”,并捕获数据流关系。分布式运行时会在不同的设备上并行执行 Pipeline 阶段,处理微批次分割、调度、通信和梯度传播等事宜。
总而言之,pipelining
包提供了以下功能:
基于简单规范对模型代码进行分割。
丰富支持 Pipeline 调度,包括 GPipe、1F1B、Interleaved 1F1B 和 Looped BFS,并提供编写自定义调度的基础设施。
一流的跨主机 Pipeline 并行支持,因为 PP 通常在此场景下使用(在较慢的互连上)。
与其他 PyTorch 并行技术(如数据并行 (DDP, FSDP) 或张量并行)的可组合性。 TorchTitan 项目演示了 Llama 模型上的“3D 并行”应用。
步骤 1:构建 PipelineStage
#
在我们可以使用 PipelineSchedule
之前,我们需要创建 PipelineStage
对象,这些对象包装了在该阶段运行的模型部分。 PipelineStage
负责分配通信缓冲区并创建发送/接收操作与对等方通信。它管理中间缓冲区,例如尚未被消耗的forward输出,并提供一个实用程序来运行阶段模型的反向传播。
PipelineStage
需要知道阶段模型的输入和输出形状,以便正确分配通信缓冲区。形状必须是静态的,例如在运行时,形状不能从一步到另一步发生变化。如果运行时形状与预期形状不匹配,将引发 PipeliningShapeError
类。当与其他并行技术组合或应用混合精度时,必须考虑这些技术,以便 PipelineStage
知道运行时阶段模块输出的正确形状(和数据类型)。
用户可以直接构造 PipelineStage
实例,通过传入一个代表应在该阶段运行的模型部分的 nn.Module
。这可能需要更改原始模型代码。请参见 选项 1:手动分割模型 中的示例。
或者,分割前端可以使用图分割技术,自动将您的模型分割成一系列 nn.Module
。此技术要求模型可使用 torch.Export
进行跟踪。结果 nn.Module
与其他并行技术的组合是实验性的,可能需要一些变通方法。如果用户无法轻松更改模型代码,使用此前端可能更有吸引力。有关更多信息,请参见 选项 2:自动分割模型。
步骤 2:使用 PipelineSchedule
进行执行#
现在我们可以将 PipelineStage
附加到 pipeline 调度,并使用输入数据运行调度。这是一个 GPipe 示例
from torch.distributed.pipelining import ScheduleGPipe
# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)
# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)
# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
schedule.step(x)
else:
output = schedule.step()
请注意,上述代码需要为每个 worker 启动,因此我们使用启动服务来启动多个进程。
torchrun --nproc_per_node=2 example.py
模型分割选项#
选项 1:手动分割模型#
要直接构造 PipelineStage
,用户需要提供一个 nn.Module
实例,该实例拥有相关的 nn.Parameters
和 nn.Buffers
,并定义一个 forward()
方法来执行与该阶段相关的操作。例如,Torchtitan 中 Transformer 类的精简版本显示了一种构建易于分割模型的模式。
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(...)
# Using a ModuleDict lets us delete layers without affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = TransformerBlock(...)
self.output = nn.Linear(...)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, self.freqs_cis)
h = self.norm(h) if self.norm else h
output = self.output(h).float() if self.output else h
return output
以这种方式定义的模型可以通过以下方式轻松地按阶段配置:首先初始化整个模型(使用 meta-device 避免 OOM 错误),删除该阶段不需要的层,然后创建一个包装模型的 PipelineStage。例如:
with torch.device("meta"):
assert num_stages == 2, "This is a simple 2-stage example"
# we construct the entire model, then delete the parts we do not need for this stage
# in practice, this can be done using a helper function that automatically divides up layers across stages.
model = Transformer()
if stage_index == 0:
# prepare the first stage model
del model.layers["1"]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
model.tok_embeddings = None
del model.layers["0"]
from torch.distributed.pipelining import PipelineStage
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
当与其他数据或模型并行技术组合时,如果模型块的输出形状/数据类型会受到影响,可能还需要 output_args
。
选项 2:自动分割模型#
如果您拥有完整的模型,并且不想花时间将其修改为一系列“模型分区”,那么 pipeline
API 将为您提供帮助。这是一个简短的示例:
class Model(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
self.emb = torch.nn.Embedding(10, 3)
self.layers = torch.nn.ModuleList(
Layer() for _ in range(2)
)
self.lm = LMHead()
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.emb(x)
for layer in self.layers:
x = layer(x)
x = self.lm(x)
return x
如果我们打印模型,可以看到多个层级结构,这使得手动分割变得困难。
Model(
(emb): Embedding(10, 3)
(layers): ModuleList(
(0-1): 2 x Layer(
(lin): Linear(in_features=3, out_features=3, bias=True)
)
)
(lm): LMHead(
(proj): Linear(in_features=3, out_features=3, bias=True)
)
)
让我们看看 pipeline
API 是如何工作的。
from torch.distributed.pipelining import pipeline, SplitPoint
# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])
pipe = pipeline(
module=mod,
mb_args=(x,),
split_spec={
"layers.1": SplitPoint.BEGINNING,
}
)
pipeline
API 根据 split_spec
来分割您的模型,其中 SplitPoint.BEGINNING
表示在 forward
函数中某个子模块执行 **之前** 添加一个分割点,同理,SplitPoint.END
表示在 **之后** 添加。
如果我们 print(pipe)
,我们可以看到:
GraphModule(
(submod_0): GraphModule(
(emb): InterpreterModule()
(layers): Module(
(0): InterpreterModule(
(lin): InterpreterModule()
)
)
)
(submod_1): GraphModule(
(layers): Module(
(1): InterpreterModule(
(lin): InterpreterModule()
)
)
(lm): InterpreterModule(
(proj): InterpreterModule()
)
)
)
def forward(self, x):
submod_0 = self.submod_0(x); x = None
submod_1 = self.submod_1(submod_0); submod_0 = None
return (submod_1,)
“模型分区”由子模块 (submod_0
, submod_1
) 表示,每个子模块都使用原始模型操作、权重和层级结构重建。此外,一个“根级别”的 forward
函数被重建,以捕获这些分区之间的数据流。这些数据流稍后将由 pipeline 运行时以分布式方式重放。
Pipe
对象提供了一个检索“模型分区”的方法。
stage_mod : nn.Module = pipe.get_stage_module(stage_idx)
返回的 stage_mod
是一个 nn.Module
,您可以使用它来创建优化器、保存或加载检查点,或应用其他并行技术。
Pipe
还允许您在给定 ProcessGroup
的设备上创建分布式阶段运行时。
stage = pipe.build_stage(stage_idx, device, group)
或者,如果您想在对 stage_mod
进行一些修改后稍后构建阶段运行时,可以使用函数式版本的 build_stage
API。例如:
from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel
dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)
注意
pipeline
前端使用一个跟踪器 (torch.export
) 将您的模型捕获到单个图中。如果您的模型不是完全可捕获的,您可以使用下面的手动前端。
Hugging Face 示例#
在该包最初创建的 PiPPy 仓库中,我们保留了基于未修改的 Hugging Face 模型的示例。请参见 examples/huggingface 目录。
示例包括:
技术深入分析#
pipeline
API 如何分割模型?#
首先,pipeline
API 通过跟踪模型将其转换为有向无环图 (DAG)。它使用 torch.export
— 一个 PyTorch 2 的全图捕获工具 — 来跟踪模型。
然后,它将阶段所需的 **操作和参数** 组合到一个重建的子模块中:submod_0
, submod_1
, ...
与 Module.children()
等传统的子模块访问方法不同,pipeline
API 不仅切割模型的模块结构,还切割模型的 **forward** 函数。
这是必需的,因为像 Module.children()
这样的模块结构仅在 Module.__init__()
期间捕获信息,而不会捕获关于 Module.forward()
的任何信息。换句话说,Module.children()
缺乏关于以下对 Pipelining 至关重要的方面的信息:
forward
中子模块的执行顺序子模块之间的激活流
子模块之间是否存在任何函数式运算符(例如,
relu
或add
操作不会被Module.children()
捕获)。
pipeline
API 相反,它确保了 forward
的行为得到了真正保留。它还捕获了分区之间的激活流,帮助分布式运行时在无需人工干预的情况下进行正确的发送/接收调用。
pipeline
API 的另一个灵活性是,分割点可以在模型层级结构内的任意级别。在分割分区中,与该分区相关的原始模型层级结构将免费重建。因此,指向子模块或参数的完全限定名称 (FQNs) 仍然有效,并且依赖于 FQNs 的服务(如 FSDP、TP 或 checkpointing)几乎无需更改代码即可与您的分区模块一起运行。
实现您自己的调度#
您可以通过扩展以下两个类之一来实现自己的 pipeline 调度:
PipelineScheduleSingle
PipelineScheduleMulti
PipelineScheduleSingle
适用于 **每个 rank 只分配一个** 阶段的调度。 PipelineScheduleMulti
适用于每个 rank 分配多个阶段的调度。
例如,ScheduleGPipe
和 Schedule1F1B
是 PipelineScheduleSingle
的子类。而 ScheduleInterleaved1F1B
、ScheduleLoopedBFS
、ScheduleInterleavedZeroBubble
和 ScheduleZBVZeroBubble
是 PipelineScheduleMulti
的子类。
日志记录#
您可以使用 torch._logging 中的 TORCH_LOGS
环境变量启用额外的日志记录。
TORCH_LOGS=+pp
将显示logging.DEBUG
消息及其以上所有级别。TORCH_LOGS=pp
将显示logging.INFO
消息及其以上。TORCH_LOGS=-pp
将显示logging.WARNING
消息及其以上。
API 参考#
模型分割 API#
以下 API 集将您的模型转换为 pipeline 表示。
- class torch.distributed.pipelining.SplitPoint(value)[source]#
枚举,表示在子模块执行中可以发生分割的点。:ivar BEGINNING: 表示在 forward 函数中某个子模块执行 **之前** 添加一个分割点。:ivar END: 表示在 forward 函数中某个子模块执行 **之后** 添加一个分割点。
- torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source]#
根据规范分割模块。
更多详情请参见 Pipe。
- 参数
module (Module) – 要分割的模块。
mb_kwargs (Optional[dict[str, Any]]) – 示例关键字输入,以微批次形式。(默认为:None)
split_spec (Optional[dict[str, torch.distributed.pipelining._IR.SplitPoint]]) – 使用子模块名称作为分割标记的字典。(默认为:None)
split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用于分割模块的策略。(默认为:None)
- 返回类型
类 Pipe 的 pipeline 表示。
微批次工具#
- torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[source]#
给定一系列 args 和 kwargs,根据它们各自的分块规范将它们分割成多个块。
- 参数
chunks (int) – 要将 args 和 kwargs 分割成的块数。
args_chunk_spec (Optional[tuple[torch.distributed.pipelining.microbatch.TensorChunkSpec, ...]]) – args 的分块规范,形状与 args 相同。
kwargs_chunk_spec (Optional[dict[str, torch.distributed.pipelining.microbatch.TensorChunkSpec]]) – kwargs 的分块规范,形状与 kwargs 相同。
- 返回
分片 args 和 kwargs 的列表:分片 kwargs 的列表。
- 返回类型
args_split
Pipeline 阶段#
- class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[source]#
表示 Pipeline 并行设置中 pipeline 阶段的类。
PipelineStage 假设模型的顺序分割,即模型被分割成块,一个块的输出馈入下一个块的输入,没有跳过连接。
PipelineStage 通过按线性顺序将 stage0 的输出传播到 stage1 等,自动执行运行时形状/数据类型推断。要绕过形状推断,请将 input_args 和 output_args 传递给每个 PipelineStage 实例。
- 参数
submodule (nn.Module) – 由此阶段包装的 PyTorch 模块。
stage_index (int) – 此阶段的 ID。
num_stages (int) – 阶段总数。
device (torch.device) – 此阶段所在设备。
input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模块的输入参数。
output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模块的输出参数。
group (dist.ProcessGroup, optional) – 分布式训练的进程组。如果为 None,则使用默认组。
dw_builder (Optional[Callable[[], Callable[..., None]]) – 如果提供,dw_builder 将构建一个新的 dw_runner 函数,该函数将用于 F, I, W(正向、输入、权重)零气泡调度的 W 操作(输入权重)。
- torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source]#
给定一个要由此阶段包装的 stage_module 和 pipeline 信息,创建一个 pipeline 阶段。
- 参数
stage_module (torch.nn.Module) – 要由此阶段包装的模块。
stage_index (int) – 此阶段在 pipeline 中的索引。
pipe_info (PipeInfo) – 关于 pipeline 的信息,可以通过 pipe.info() 检索。
device (torch.device) – 此阶段要使用的设备。
group (Optional[dist.ProcessGroup]) – 此阶段要使用的进程组。
- 返回
一个可以与 PipelineSchedules 一起运行的 pipeline 阶段。
- 返回类型
_PipelineStage
Pipeline 调度#
- class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
GPipe 调度。将以填充-排空 (fill-drain) 的方式遍历所有微批次。
- class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
1F1B 调度。在稳定状态下,将对微批次执行一次前向和一次后向。
- class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
交错 1F1B 调度。请参阅 https://arxiv.org/pdf/2104.04473 以获取详细信息。在稳定状态下,将对微批次执行一次前向和一次后向,并支持每个 rank 多个阶段。当微批次准备好处理多个本地阶段时,Interleaved 1F1B 会优先处理较早的微批次(也称为“深度优先”)。
此调度与原始论文非常相似。它的不同之处在于放宽了 num_microbatch % pp_size == 0 的要求。使用 flex_pp 调度,我们将得到 num_rounds = max(1, n_microbatches // pp_group_size),并且只要 n_microbatches % num_rounds == 0 即可工作。例如,支持:
pp_group_size = 4, n_microbatches = 10。我们将得到 num_rounds = 2,且 n_microbatches % 2 == 0。
pp_group_size = 4, n_microbatches = 3。我们将得到 num_rounds = 1,且 n_microbatches % 1 == 0。
- class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None, scale_grads=True)[source]#
广度优先 Pipeline 并行。请参阅 https://arxiv.org/abs/2211.05953 以获取详细信息。与 Interleaved 1F1B 类似,Looped BFS 支持每个 rank 多个阶段。不同之处在于,当微批次准备好处理多个本地阶段时,Looped BFS 会优先处理较早的阶段,一次性运行所有可用的微批次。
- class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
交错零气泡调度。请参阅 https://arxiv.org/pdf/2401.10241 以获取详细信息。在稳定状态下,将对微批次的输入执行一次前向和一次后向,并支持每个 rank 多个阶段。使用反向传播权重来填充 pipeline 气泡。
特别地,这实现了论文中的 ZB1P 调度。
- class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
零气泡调度(ZBV 变体)。请参阅 https://arxiv.org/pdf/2401.10241 第 6 节以获取详细信息。
此调度要求每个 rank 必须有两个阶段。
此调度在稳定状态下将对微批次的输入执行一次前向和一次后向,并支持每个 rank 多个阶段。使用反向传播权重来填充 pipeline 气泡。
这个 ZB-V 调度只有在 time forward == time backward input == time backward weights 时才具有“零气泡”属性。实际上,对于真实的模型,这不太可能发生,因此可以改用贪婪调度器来实现不等/不平衡的时间。
- class torch.distributed.pipelining.schedules.ScheduleDualPipeV(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
DualPipeV 调度。基于 DeepSeek 在 https://arxiv.org/pdf/2412.19437 中提出的 DualPipe 调度的更高效的调度变体。
基于 deepseek-ai/DualPipe 的开源代码。
- class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
单阶段调度的基类。实现了 step 方法。派生类应实现 _step_microbatches。
根据 scale_grads 参数(默认为 True),梯度会按 num_microbatches 进行缩放。此设置应与您的 loss_fn 的配置匹配,loss_fn 可以平均损失(scale_grads=True)或求和损失(scale_grads=False)。
- class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, use_full_backward=None, scale_grads=True)[source]#
多阶段调度的基类。实现了 step 方法。
根据 scale_grads 参数(默认为 True),梯度会按 num_microbatches 进行缩放。此设置应与您的 loss_fn 的配置匹配,loss_fn 可以平均损失(scale_grads=True)或求和损失(scale_grads=False)。