评价此页

使用张量并行 (TP) 进行大规模 Transformer 模型训练#

创建日期:2024 年 4 月 19 日 | 最后更新:2025 年 7 月 18 日 | 最后验证:2024 年 11 月 5 日

作者: 梁万超, 刘天宇

注意

编辑github 上查看和编辑本教程。

本教程演示了如何使用张量并行和全分片数据并行在数百到数千个 GPU 上训练大型 Transformer 类模型。

先决条件

张量并行如何工作?#

张量并行 (TP) 最初在 Megatron-LM 论文中提出,它是一种高效的模型并行技术,用于训练大规模 Transformer 模型。本教程中提及的 序列并行 (SP) 是张量并行的一种变体,它在序列维度上进行分片,用于 nn.LayerNormRMSNorm,以进一步节省训练期间的激活内存。随着模型变大,激活内存成为瓶颈,因此在张量并行训练中,通常将序列并行应用于 LayerNormRMSNorm 层。

Megatron-LM TP

图 1. 表示 Transformer 模型的 MLP 和自注意力层中的张量并行分片,其中注意力/MLP 中的矩阵乘法通过分片计算发生 (图片来源)#

从高层次来看,PyTorch 张量并行的工作方式如下:

分片初始化

  • 确定要应用于每个层的 ParallelStyle,并通过调用 parallelize_module 来分片初始化的模块。

  • 并行化的模块的型号参数将被交换为 DTensor,DTensor 将负责使用分片计算运行并行化的模块。

运行时前向/后向

  • 根据用户为每个 ParallelStyle 指定的输入/输出 DTensor 布局,它将运行适当的通信操作来转换输入/输出的 DTensor 布局(例如 allreduceallgatherreduce_scatter)。

  • 对并行化的层运行分片计算以节省计算/内存(例如,nn.Linearnn.Embedding)。

何时以及为何应应用张量并行#

PyTorch 全分片数据并行 (FSDP) 已经能够将模型训练扩展到特定数量的 GPU。然而,当涉及到进一步扩展模型训练(就模型大小和 GPU 数量而言)时,出现了许多额外的挑战,可能需要将张量并行与 FSDP 结合使用。

  1. 当世界大小(GPU 数量)变得过大(超过 128/256 个 GPU)时,FSDP 集合操作(如 allgather)被环形延迟主导。通过在 FSDP 之上实现 TP/SP,FSDP 世界大小可以通过将 FSDP 应用于仅主机间通信来减少 8 倍,从而以相同的量降低延迟成本。

  2. 达到数据并行限制,由于收敛和 GPU 内存限制,您无法将全局批处理大小提高到超过 GPU 数量的水平,张量/序列并行是“估算”全局批处理大小并继续使用更多 GPU 进行扩展的唯一已知方法。这意味着模型大小和 GPU 数量都可以继续扩展。

  3. 对于某些类型的模型,当局部批处理大小变小,TP/SP 可以产生更优化的浮点运算(FLOPS)的矩阵乘法形状。

那么,在预训练时,触及这些限制有多容易?截至目前,预训练一个拥有数十亿或数万亿个 token 的大型语言模型 (LLM) 可能需要数月时间,即使使用数千个 GPU 也是如此。

  • 在大规模训练 LLM 时,总会遇到限制 1。例如,Llama 2 70B 使用 2k GPU 训练了 35 天,在 2k 规模下需要多维并行。

  • 当 Transformer 模型变大(例如 Llama2 70B)时,它也会很快达到限制 2。由于内存和收敛限制,即使局部 batch_size=1,也无法单独使用 FSDP。例如,Llama 2 的全局批处理大小为 1K,因此在 2K GPU 上无法单独使用数据并行。

如何应用张量并行#

PyTorch 张量并行 API 提供了一组模块级别的原语(ParallelStyle)来配置模型每个独立层的分片,包括:

  • ColwiseParallelRowwiseParallel:按列或按行方式分片 nn.Linearnn.Embedding

  • SequenceParallel:对 nn.LayerNormnn.DropoutRMSNormPython 等执行分片计算。

  • PrepareModuleInputPrepareModuleOutput:配置模块输入/输出分片布局,并进行适当的通信操作。

为了演示如何使用 PyTorch 原生张量并行 API,让我们看一个常见的 Transformer 模型。在本教程中,我们使用最新的 Llama2 模型作为参考 Transformer 模型实现,因为它在社区中也被广泛使用。

由于张量并行将单个张量分片到一组设备上,我们需要首先设置分布式环境(例如 NCCL 通信器)。张量并行是一种单程序多数据 (SPMD) 分片算法,类似于 PyTorch DDP/FSDP,它在底层利用 PyTorch DTensor 执行分片。它还利用 DeviceMesh 抽象(在底层管理 ProcessGroups)进行设备管理和分片。要了解如何利用 DeviceMesh 设置多维并行,请参阅 此教程。张量并行通常在每个主机内工作,所以让我们首先初始化一个连接主机内 8 个 GPU 的 DeviceMesh。

from torch.distributed.device_mesh import init_device_mesh

tp_mesh = init_device_mesh("cuda", (8,))

现在我们已经初始化了 DeviceMesh,让我们详细了解 Llama 2 模型架构,看看我们应该如何执行张量并行分片。这里我们关注核心的 TransformerBlock,其中 Transformer 模型堆叠相同的 TransformerBlock 以扩展模型。

核心 TransformerBlock 由一个 Attention 层和一个 FeedForward 层组成。让我们首先看一个更简单的 FeedForward 层。对于 FeedForward 层,它由三个线性层组成,其中它执行 SwiGLU 样式 MLP,查看其前向函数

# forward in the FeedForward layer
def forward(self, x):
    return self.w2(F.silu(self.w1(x)) * self.w3(x))

它同时执行 w1w3 矩阵乘法,然后对合并的 w1/w3 线性投影结果进行 w2 矩阵乘法。这意味着我们可以使用张量并行论文中的思想,以列式方式分片 w1/w3 线性层,并以行式方式分片 w2 线性层,这样在所有三个层之后只发生一次 allreduce 通信。通过 PyTorch 原生张量并行,我们可以简单地为 FeedForward 层创建一个 parallelize_plan,如下所示:

from torch.distributed.tensor.parallel import ColwiseParallel, RowwiseParallel, parallelize_module

layer_tp_plan = {
    # by default ColwiseParallel input layouts is replicated
    # and RowwiseParallel output layouts is replicated
    "feed_foward.w1": ColwiseParallel(),
    "feed_forward.w2": RowwiseParallel(),
    "feed_forward.w3": ColwiseParallel(),
}

这正是我们使用 PyTorch 张量并行 API 配置 FeedForward 层分片的方式。请注意,用户只需指定如何分片单个层,通信(例如 allreduce)将在底层自动发生。

接下来是 Attention 层。它由 wqwkwv 线性层组成,用于将输入投影到 q/k/v,然后执行注意力并将输出投影到 wo 线性层。这里的张量并行旨在对 q/k/v 投影执行列式分片,并对 wo 线性投影执行行式分片。因此,我们可以将注意力计划添加到我们刚刚起草的 tp_plan 中:

layer_tp_plan = {
    # by default ColwiseParallel input layouts is replicated
    # and RowwiseParallel output layouts is replicated
    "attention.wq": ColwiseParallel(use_local_output=False),
    "attention.wk": ColwiseParallel(use_local_output=False),
    "attention.wv": ColwiseParallel(use_local_output=False),
    "attention.wo": RowwiseParallel(),
    "feed_forward.w1": ColwiseParallel(),
    "feed_forward.w2": RowwiseParallel(),
    "feed_forward.w3": ColwiseParallel(),
}

这几乎是我们对 TransformerBlock 应用张量并行所需的 layer_tp_plan。然而,我们应该注意一点:当对线性层进行列式分片时,线性层的输出将在最后一个张量维度上分片,而行式分片线性层直接接受在最后一个维度上分片的输入。如果在列式线性层和行式线性层之间有任何额外的张量操作(例如视图操作),我们需要调整相关的形状相关操作以适应分片形状。

对于 Llama 模型,在注意力层中,有几个与形状相关的视图操作。具体来说,对于 wq/wk/wv 线性层的列式并行,激活张量在 num_heads 维度上分片。为了管理全局和局部 num_heads 之间的差异,我们应该设置 use_local_output=False 以确保输出是 DTensor。与常规张量不同,DTensor 了解并行计划,并将自动处理 num_heads 维度的变化。

最后,我们需要调用 parallelize_module API,使每个 TransformerBlock 的计划生效。在底层,它将 AttentionFeedForward 层内的模型参数分发给 DTensor,并根据需要为模型输入和输出(在每个模块之前和之后)注册通信钩子。

for layer_id, transformer_block in enumerate(model.layers):
    layer_tp_plan = {...}  # i.e. the plan we just generated

    parallelize_module(
        module=transformer_block,
        device_mesh=tp_mesh,
        parallelize_plan=layer_tp_plan,
    )

现在我们已经详细阐述了每个 TransformerBlock 的分片计划,通常在第一层有一个 nn.Embedding 和一个最终的 nn.Linear 投影层,用户可以选择对第一个 nn.Embedding 进行行式或列式分片,并对最后一个 nn.Linear 投影层进行列式分片,并指定适当的输入和输出布局。以下是一个示例:

model = parallelize_module(
    model,
    tp_mesh,
    {
        "tok_embeddings": RowwiseParallel(
            input_layouts=Replicate(),
        ),
        "output": ColwiseParallel(
            output_layouts=Replicate(),
        ),
    }
)

注意

如果需要分区的模型太大而无法适应 CPU 内存,可以使用 meta 设备初始化(例如,首先在 meta 设备上初始化模型,分片层,然后实例化模型),或者在 Transformer 模型初始化期间逐层并行化 TransformerBlock 层。

将序列并行应用于 LayerNorm/RMSNorm#

序列并行在上面演示的张量并行的基础上工作。与基本张量并行相比,基本张量并行只在 Attention 模块和 FeedForward 模块内部对张量进行分片,并保持其模块输入和输出(即前向传播中的激活和后向传播中的梯度)的复制,而序列并行则保持它们在序列维度上的分片。

在典型的 TransformerBlock 中,前向函数结合了规范化层(LayerNormRMSNorm)、注意力层、前馈层和残差连接。例如:

# forward in a TransformerBlock
def forward(self, x):
    h = x + self.attention(self.attention_norm(x))
    out = h + self.feed_forward(self.ffn_norm(h))
    return out

在大多数用例中,AttentionFeedForward 模块外部的激活(和梯度)的形状为 [batch size, sequence length, hidden dimension]。在 DTensor 的语言中,序列并行使用 Shard(1) 布局执行模块的前向/后向的激活计算。根据前面提供的代码示例,以下代码演示了如何将序列并行应用于 TransformerBlock 中的规范化层:

首先,让我们导入序列并行所需的依赖项

from torch.distributed.tensor.parallel import (
    PrepareModuleInput,
    SequenceParallel,
)

接下来,让我们调整 layer_tp_plan 以在 RMSNorm 层上启用序列并行

layer_tp_plan = {
    # Now the input and output of SequenceParallel has Shard(1) layouts,
    # to represent the input/output tensors sharded on the sequence dimension
    "attention_norm": SequenceParallel(),
    "attention": PrepareModuleInput(
        input_layouts=(Shard(1), Replicate()),
        desired_input_layouts=(Replicate(), Replicate()),
    ),
    "attention.wq": ColwiseParallel(use_local_output=False),
    "attention.wk": ColwiseParallel(use_local_output=False),
    "attention.wv": ColwiseParallel(use_local_output=False),
    "attention.wo": RowwiseParallel(output_layouts=Shard(1)),
    "ffn_norm": SequenceParallel(),
    "feed_forward": PrepareModuleInput(
        input_layouts=(Shard(1),),
        desired_input_layouts=(Replicate(),),
    ),
    "feed_forward.w1": ColwiseParallel(),
    "feed_forward.w2": RowwiseParallel(output_layouts=Shard(1)),
    "feed_forward.w3": ColwiseParallel(),
}

可以看到,我们现在使用 PrepareModuleInput 将 Attention 和 FeedForward 层的模块输入布局从 Shard(1) 修改为 Replicate(),并将其输出布局标记为 Shard(1)。就像张量并行发生的那样,只需指定输入和输出的张量分片布局,层之间的通信将自动进行。

请注意,使用序列并行时,我们假设 TransformerBlock 的输入和输出始终在序列维度上分片,以便多个 TransformerBlock 可以无缝连接。这可以通过明确指定起始 nn.Embedding 层的输出和最终 nn.Linear 投影层的输入为 Shard(1) 来实现:

model = parallelize_module(
    model,
    tp_mesh,
    {
        "tok_embeddings": RowwiseParallel(
            input_layouts=Replicate(),
            output_layouts=Shard(1),
        ),
        "norm": SequenceParallel(),
        "output": ColwiseParallel(
            input_layouts=Shard(1),
            output_layouts=Replicate()
        ),
    }
)

应用损失并行#

损失并行是一种相关的技术,用于在计算损失函数时节省内存和通信,因为模型输出通常非常大。在损失并行中,当模型输出在(通常巨大的)词汇维度上分片时,可以高效地计算交叉熵损失,而无需将所有模型输出收集到每个 GPU。这不仅显著减少了内存消耗,而且通过减少通信开销和并行进行分片计算来提高训练速度。下图简要说明了损失并行如何通过分片计算避免将所有模型输出收集到每个 GPU。

loss parallel

图 2. 单个 GPU 上带损失并行的交叉熵损失前向计算。蓝色表示分片张量;绿色表示复制张量;黄色表示部分值张量(待全部归约)。黑色箭头是局部计算;红色箭头是 GPU 之间的功能集合操作。#

在 PyTorch 张量并行 API 中,可以通过上下文管理器 loss_parallel 启用损失并行,通过它可以直接使用 torch.nn.functional.cross_entropytorch.nn.CrossEntropyLoss,而无需修改代码的其他部分。

要应用损失并行,模型预测(通常形状为 [batch size, sequence length, vocabulary size])应在词汇维度上分片。这可以通过标记最后一个线性投影层输出的输出布局轻松完成:

model = parallelize_module(
    model,
    tp_mesh,
    {
        "tok_embeddings": RowwiseParallel(
            input_layouts=Replicate(),
            output_layouts=Shard(1),
        ),
        "norm": SequenceParallel(),
        "output": ColwiseParallel(
            input_layouts=Shard(1),
            # use DTensor as the output
            use_local_output=False,
        ),
    },
)

在上面的代码中,我们还在输出前对规范化层应用了序列并行。我们应用了 use_local_output=False 以使输出保持为 DTensor,以便与 loss_parallel 上下文管理器一起工作。之后,可以直接调用 cross_entropy 损失函数,如下所示。请注意,反向计算也需要在该上下文内发生。

import torch.nn.functional as F
from torch.distributed.tensor.parallel import loss_parallel

pred = model(input_ids)
with loss_parallel():
    # assuming pred and labels are of the shape [batch, seq, vocab]
    loss = F.cross_entropy(pred.flatten(0, 1), labels.flatten(0, 1))
    loss.backward()

将张量并行与全分片数据并行相结合#

现在我们已经展示了如何将张量/序列并行应用于模型,让我们也看看张量并行和全分片数据并行如何协同工作。由于张量并行会引起阻塞计算的通信,我们希望确保它在快速通信通道(例如 NVLink)内运行。在实践中,我们通常在每个主机内应用张量并行,并在主机之间应用全分片数据并行。

fsdp + tp

图 3. FSDP 和 TP 在单独的设备维度上工作,FSDP 通信发生在主机间,TP 通信发生在主机内。#

这种 2D 并行模式可以轻松地通过 2D DeviceMesh 表达,我们只需要将每个“子”DeviceMesh 传递给每个独立的并行 API 即可:

from torch.distributed.device_mesh import init_device_mesh
from torch.distributed.tensor.parallel import ColwiseParallel, RowwiseParallel, parallelize_module
from torch.distributed.fsdp import fully_shard

# i.e. 2-D mesh is [dp, tp], training on 64 GPUs that performs 8 way DP and 8 way TP
mesh_2d = init_device_mesh("cuda", (8, 8))
tp_mesh = mesh_2d["tp"] # a submesh that connects intra-host devices
dp_mesh = mesh_2d["dp"] # a submesh that connects inter-host devices

model = Model(...)

tp_plan = {...}

# apply Tensor Parallel intra-host on tp_mesh
model_tp = parallelize_module(model, tp_mesh, tp_plan)
# apply FSDP inter-host on dp_mesh
model_2d = fully_shard(model_tp, mesh=dp_mesh, ...)

这将使我们能够轻松地在每个主机内(主机内)应用张量并行,并在主机之间(主机间)应用 FSDP,对 Llama 模型实现**零代码更改**。张量(模型)并行和数据并行技术的结合提供了继续增加模型大小并使用大量 GPU 进行高效训练的能力。

结论#

本教程演示了如何将张量并行与全分片数据并行相结合,在数百到数千个 GPU 上训练大型 Transformer 类模型。它解释了如何将张量并行应用于模型的不同部分,而无需对模型本身进行**任何代码更改**。张量并行是一种用于大规模训练的高效模型并行技术。

要查看本教程中解释的完整端到端代码示例,请参阅 pytorch/examples 仓库中的 张量并行示例