评价此页

使用 Join 上下文管理器进行不均匀输入的分布式训练#

创建日期:2021年8月4日 | 最后更新:2025年9月3日 | 最后验证:2024年11月5日

作者: Andrew Gu

注意

editgithub 上查看并编辑本教程。

注意

Join 作为原型功能在 PyTorch 1.10 中引入。此 API 可能会发生变化。

在本教程中,您将看到:

  • Join 上下文管理器的概述。

  • 如何在 DistributedDataParallel 中使用该上下文管理器的示例。

  • 如何在同时使用 DistributedDataParallelZeroRedundancyOptimizer 的情况下使用该上下文管理器的示例。

  • 向上下文管理器传递关键字参数的示例。

  • 深入探讨 Join 上下文管理器的工作原理。

  • 展示如何使自定义玩具类与该上下文管理器兼容的示例。

要求#

什么是 Join#

分布式数据并行入门 - 基本用例 中,您了解了使用 DistributedDataParallel 进行数据并行训练的基本框架。它在每次反向传播中隐式调度 all-reduce 操作,以在各进程(rank)之间同步梯度。这类 集合通信 需要进程组中所有进程的参与,因此如果某个进程的输入较少,其他进程就会挂起或报错(取决于后端)。更广泛地说,对于任何在每次迭代中执行同步集合通信的类,这个问题都存在。

Join 是一个上下文管理器,用于包裹各进程的训练循环,以促进不均匀输入下的训练。该上下文管理器允许已耗尽输入的进程(即“提前加入/join”)对尚未加入的进程执行的集合通信进行“遮蔽”(shadow)。通信被遮蔽的方式由钩子(hooks)指定。

DistributedDataParallel 中使用 Join#

PyTorch 的 DistributedDataParallel 开箱即用地支持 Join 上下文管理器。以下是一个使用示例:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.distributed.algorithms.join import Join
from torch.nn.parallel import DistributedDataParallel as DDP

BACKEND = "nccl"
WORLD_SIZE = 2
NUM_INPUTS = 5

def worker(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
    # Rank 1 gets one more input than rank 0
    inputs = [torch.tensor([1]).float() for _ in range(NUM_INPUTS + rank)]

    num_inputs = 0
    with Join([model]):
        for input in inputs:
            num_inputs += 1
            loss = model(input).sum()
            loss.backward()

    print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")

def main():
    mp.spawn(worker, nprocs=WORLD_SIZE, join=True)

if __name__ == "__main__":
    main()

这将产生以下输出(其中来自 rank 0 和 rank 1 的 print() 顺序可能是随机的):

Rank 0 has exhausted all 5 of its inputs!
Rank 1 has exhausted all 6 of its inputs!

注意

在引入此通用 Join 上下文管理器之前,DistributedDataParallel 提供了自己的 join() 上下文管理器。在上面的示例中,使用 with Join([model]): 等同于使用 with model.join():。现有 DistributedDataParallel.join() 的一个局限性是它不允许存在多个参与类,例如无法同时使用 DistributedDataParallelZeroRedundancyOptimizer

DistributedDataParallelZeroRedundancyOptimizer 中使用 Join#

Join 上下文管理器不仅适用于单个类,还适用于多个类共同协作。PyTorch 的 ZeroRedundancyOptimizer 也与该上下文管理器兼容。下面我们探讨如何修改前面的示例,以便同时使用 DistributedDataParallelZeroRedundancyOptimizer

from torch.distributed.optim import ZeroRedundancyOptimizer as ZeRO
from torch.optim import Adam

def worker(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
    optim = ZeRO(model.parameters(), Adam, lr=0.01)
    # Rank 1 gets one more input than rank 0
    inputs = [torch.tensor([1]).float() for _ in range(NUM_INPUTS + rank)]

    num_inputs = 0
    # Pass both `model` and `optim` into `Join()`
    with Join([model, optim]):
        for input in inputs:
            num_inputs += 1
            loss = model(input).sum()
            loss.backward()
            optim.step()

    print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")

这将产生与之前相同的输出。显著的变化是将 ZeroRedundancyOptimizer 实例也传入了 Join()

传递关键字参数#

类可以提供关键字参数,在运行时修改其在上下文管理器中的行为。例如,DistributedDataParallel 提供了一个参数 divide_by_initial_world_size,它决定梯度是除以初始进程数(world size)还是有效进程数(即未加入的进程数)。此类关键字参数可以直接传递给上下文管理器。

with Join([model, optim], divide_by_initial_world_size=False):
    for input in inputs:
        ...

警告

传递给上下文管理器的关键字参数在所有参与的类之间共享。由于我们不预期存在多个 Joinable 需要对同一参数进行不同设置的情况,这应该不会成为限制。尽管如此,仍需注意这一点。

Join 如何工作?#

在了解了如何使用 Join 上下文管理器的初步示例后,让我们深入探讨其工作原理。这将使您更深入地理解其提供的全部功能,并为您制作自定义兼容类做好准备。在此,我们将介绍 Join 类以及辅助类 JoinableJoinHook

Joinable#

首先,与 Join 上下文管理器兼容的类必须继承自抽象基类 Joinable。特别是,Joinable 必须实现:

  • join_hook(self, **kwargs) -> JoinHook

该方法返回 JoinableJoinHook 实例,决定已加入的进程应如何遮蔽 Joinable 执行的每轮迭代集合通信。

  • join_device(self) -> torch.device

该方法返回一个设备,供 Join 上下文管理器执行集合通信,例如 torch.device("cuda:0")torch.device("cpu")

  • join_process_group(self) -> ProcessGroup

该方法返回供 Join 上下文管理器用于执行集合通信的进程组。

特别是 join_devicejoin_process_group 是确保上下文管理器能够调度已加入和未加入进程之间集合通信的必要属性。一种用法是使用 all-reduce 在每次迭代中统计未加入进程的数量。另一种用法是实现 throw_on_early_termination=True 所需的机制,我们稍后会解释。

DistributedDataParallelZeroRedundancyOptimizer 已经继承了 Joinable 并实现了上述方法,这就是为什么我们可以在前面的示例中直接使用它们。

Joinable 类应确保调用 Joinable 构造函数,因为它会初始化一个 JoinConfig 实例,该实例被上下文管理器内部使用以确保正确性。它将作为 _join_config 字段保存在每个 Joinable 中。

JoinHook#

接下来,让我们分解 JoinHook 类。JoinHook 为上下文管理器提供了两个入口点:

  • main_hook(self) -> None

此钩子由每个已加入的进程重复调用,直到存在尚未加入的进程。它的目的是在每次训练迭代(例如在一次前向传播、反向传播和优化器步骤中)遮蔽 Joinable 执行的集合通信。

  • post_hook(self, is_last_joiner: bool) -> None

当所有进程都加入后,此钩子被调用一次。它接收一个额外的 bool 参数 is_last_joiner,指示该进程是否是最后加入的进程之一。该参数可能对同步很有用。

为了说明这些钩子的具体实现,ZeroRedundancyOptimizer 的 main hook 按常规执行优化器步骤,因为已加入的进程仍然负责更新和同步其参数分片;而 DistributedDataParallel 的 post-hook 会广播最终更新后的模型,以确保所有进程的模型保持一致。

Join#

最后,让我们研究一下它们是如何融入 Join 类本身的。

  • __init__(self, joinables: List[Joinable], enable: bool = True, throw_on_early_termination: bool = False)

如我们在前面的示例中看到的,构造函数接受一个参与训练循环的 Joinable 列表。这些应该是每个迭代中执行集合通信的类。

enable 是一个 bool 值,如果您确定不会有不均匀输入,可以将其设为 False,此时上下文管理器将变为空操作,类似于 contextlib.nullcontext()。这也可以禁用参与的 Joinable 类中与 join 相关的计算。

throw_on_early_termination 是一个 bool 值,可设为 True,以便在检测到不均匀输入时让每个进程立即抛出异常。这对于不符合上下文管理器要求的场景非常有用,最常见的情况是当多个类(如 DistributedDataParallel 与具有 SyncBatchNorm 层的模型一起使用)的集合通信可能随意交错时。在这种情况下,应将此参数设为 True,以便应用逻辑能够捕获异常并决定如何处理。

  • 核心逻辑发生在 __exit__() 方法中,它在存在尚未加入的进程时进行循环,调用每个 Joinable 的 main hook;当所有进程都加入后,调用它们的 post hooks。Main hooks 和 post-hooks 都会按照传入 Joinable 的顺序进行迭代。

  • 该上下文管理器需要来自未加入进程的心跳。因此,每个 Joinable 类应在每次迭代的集合通信之前调用 Join.notify_join_context()。上下文管理器将确保只有传入的第一个 Joinable 才会真正发送心跳。

警告

关于 throw_on_early_termination,如上所述,Join 上下文管理器不兼容某些类的组合。JoinableJoinHook 必须是可序列化的,因为每个钩子在继续下一个钩子之前都会完全执行。换句话说,两个钩子不能重叠。此外,目前 main hooks 和 post-hooks 都是按照相同的确定性顺序进行迭代的。如果这看起来是一个重大限制,我们可能会修改 API 以允许自定义排序。

使玩具类与 Join 兼容#

由于上一节介绍了多个概念,让我们通过一个玩具示例在实践中看看它们。这里,我们将实现一个类,用于计算在当前进程加入之前,所有进程观察到的输入总数。这应该能让您基本了解如何使自己的类与 Join 上下文管理器兼容。

具体来说,以下代码使每个进程打印出(1)在加入之前跨所有进程观察到的输入数量,以及(2)跨所有进程的总输入数量。

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.distributed.algorithms.join import Join, Joinable, JoinHook

BACKEND = "nccl"
WORLD_SIZE = 2
NUM_INPUTS = 5

class CounterJoinHook(JoinHook):
    r"""
    Join hook for :class:`Counter`.

    Arguments:
        counter (Counter): the :class:`Counter` object using this hook.
        sync_max_count (bool): whether to sync the max count once all ranks
            join.
    """
    def __init__(
        self,
        counter,
        sync_max_count
    ):
        self.counter = counter
        self.sync_max_count = sync_max_count

    def main_hook(self):
        r"""
        Shadows the counter's all-reduce by all-reducing a dim-1 zero tensor.
        """
        t = torch.zeros(1, device=self.counter.device)
        dist.all_reduce(t)

    def post_hook(self, is_last_joiner: bool):
        r"""
        Synchronizes the max count across all :class:`Counter` s if
        ``sync_max_count=True``.
        """
        if not self.sync_max_count:
            return
        rank = dist.get_rank(self.counter.process_group)
        common_rank = self.counter.find_common_rank(rank, is_last_joiner)
        if rank == common_rank:
            self.counter.max_count = self.counter.count.detach().clone()
        dist.broadcast(self.counter.max_count, src=common_rank)

class Counter(Joinable):
    r"""
    Example :class:`Joinable` that counts the number of training iterations
    that it participates in.
    """
    def __init__(self, device, process_group):
        super(Counter, self).__init__()
        self.device = device
        self.process_group = process_group
        self.count = torch.tensor([0], device=device).float()
        self.max_count = torch.tensor([0], device=device).float()

    def __call__(self):
        r"""
        Counts the number of inputs processed on this iteration by all ranks
        by all-reducing a dim-1 one tensor; increments its own internal count.
        """
        Join.notify_join_context(self)
        t = torch.ones(1, device=self.device).float()
        dist.all_reduce(t)
        self.count += t

    def join_hook(self, **kwargs) -> JoinHook:
        r"""
        Return a join hook that shadows the all-reduce in :meth:`__call__`.

        This join hook supports the following keyword arguments:
            sync_max_count (bool, optional): whether to synchronize the maximum
                count across all ranks once all ranks join; default is ``False``.
        """
        sync_max_count = kwargs.get("sync_max_count", False)
        return CounterJoinHook(self, sync_max_count)

    @property
    def join_device(self) -> torch.device:
        return self.device

    @property
    def join_process_group(self):
        return self.process_group

    def find_common_rank(self, rank, to_consider):
        r"""
        Returns the max rank of the ones to consider over the process group.
        """
        common_rank = torch.tensor([rank if to_consider else -1], device=self.device)
        dist.all_reduce(common_rank, op=dist.ReduceOp.MAX, group=self.process_group)
        common_rank = common_rank.item()
        return common_rank

def worker(rank):
    assert torch.cuda.device_count() >= WORLD_SIZE
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    counter = Counter(torch.device(f"cuda:{rank}"), dist.group.WORLD)
    inputs = [torch.tensor([1]).float() for _ in range(NUM_INPUTS + rank)]

    with Join([counter], sync_max_count=True):
        for _ in inputs:
            counter()

    print(f"{int(counter.count.item())} inputs processed before rank {rank} joined!")
    print(f"{int(counter.max_count.item())} inputs processed across all ranks!")

def main():
    mp.spawn(worker, nprocs=WORLD_SIZE, join=True)

if __name__ == "__main__":
    main()

由于 rank 0 看到 5 个输入,rank 1 看到 6 个输入,因此产生了如下输出:

10 inputs processed before rank 0 joined!
11 inputs processed across all ranks!
11 inputs processed before rank 1 joined!
11 inputs processed across all ranks!

需要强调的几个要点:

  • Counter 实例在每次迭代中执行一次 all-reduce,因此 main hook 也执行一次 all-reduce 来遮蔽它。

  • Counter 类在其 __call__() 方法的开头调用 Join.notify_join_context(),因为这是其每次迭代集合通信(即 all-reduce)之前的位置。

  • is_last_joiner 参数用于确定 post-hooks 中的广播源。

  • 我们将 sync_max_count 关键字参数传递给上下文管理器,然后转发给 Counter 的 join hook。