• 文档 >
  • TorchRL 训练器:一个 DQN 示例
快捷方式

TorchRL 训练器:DQN 示例

作者Vincent Moens

TorchRL 提供了一个通用的 Trainer 类来处理您的训练循环。训练器执行一个嵌套循环,外层循环是数据收集,内层循环消耗这些数据或从回放缓冲区检索的数据来训练模型。在此训练循环的各个点,可以附加钩子并在给定间隔执行。

在本教程中,我们将使用训练器类从头开始训练 DQN 算法来解决 CartPole 任务。

主要收获

  • 构建一个包含其基本组件的训练器:数据收集器、损失模块、回放缓冲区和优化器。

  • 向训练器添加钩子,例如记录器、目标网络更新器等。

训练器是完全可定制的,并提供了大量功能。本教程围绕其构建进行组织。我们将首先详细介绍如何构建库的每个组件,然后使用 Trainer 类将这些部件组合起来。

在此过程中,我们还将关注库的其他一些方面

  • 如何在 TorchRL 中构建环境,包括转换(例如数据归一化、帧堆叠、调整大小和灰度化)以及并行执行。与我们在 DDPG 教程 中所做的不同,我们将对像素进行归一化,而不是状态向量。

  • 如何设计一个 QValueActor 对象,即一个估计动作值的 Actor,并选择具有最高估计回报的动作;

  • 如何从环境中高效收集数据并将其存储在回放缓冲区中;

  • 如何使用多步(multi-step),这是离策略算法的一个简单预处理步骤;

  • 最后是如何评估您的模型。

先决条件:我们鼓励您首先通过 PPO 教程 熟悉 torchrl。

DQN

DQN(深度 Q-Learning)是深度强化学习的开创性工作。

从宏观上看,该算法非常简单:Q-Learning 包括学习一个状态-动作值表,这样,当遇到任何特定状态时,我们就可以通过查找值最高的动作来知道应该选择哪个动作。这种简单的设置要求动作和状态是离散的,否则无法构建查找表。

DQN 使用一个神经网络,该网络将状态-动作空间映射到一个值(标量)空间,从而分摊了存储和探索所有可能的状态-动作组合的成本:如果一个状态过去未曾见过,我们仍然可以将其与我们的神经网络中的各种可用动作配对,并为每个可用动作获得一个内插值。

我们将解决经典的 cart pole 控制问题。从 Gymnasium 文档中检索到此环境:

一个杆通过一个未驱动的关节连接到一个小车上,小车沿着一个
无摩擦的轨道移动。该摆以直立姿态放在小车上,目标是
通过对小车施加左右方向的力来平衡杆。
我们无意提供该算法的 SOTA 实现,而是旨在在 CartPole 算法的背景下提供 TorchRL 功能的高级说明。
Cart Pole

我们无意提供该算法的 SOTA 实现,而是旨在提供 TorchRL 功能在 CartPole 算法背景下的高层次说明。

import os
import uuid

import torch
from torch import nn
from torchrl.collectors import MultiaSyncDataCollector, SyncDataCollector
from torchrl.data import LazyMemmapStorage, MultiStep, TensorDictReplayBuffer
from torchrl.envs import (
    EnvCreator,
    ExplorationType,
    ParallelEnv,
    RewardScaling,
    StepCounter,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
    CatFrames,
    Compose,
    GrayScale,
    ObservationNorm,
    Resize,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.modules import DuelingCnnDQNet, EGreedyModule, QValueActor

from torchrl.objectives import DQNLoss, SoftUpdate
from torchrl.record.loggers.csv import CSVLogger
from torchrl.trainers import (
    LogScalar,
    LogValidationReward,
    ReplayBufferTrainer,
    Trainer,
    UpdateWeights,
)


def is_notebook() -> bool:
    try:
        shell = get_ipython().__class__.__name__
        if shell == "ZMQInteractiveShell":
            return True  # Jupyter notebook or qtconsole
        elif shell == "TerminalInteractiveShell":
            return False  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False  # Probably standard Python interpreter

让我们开始准备我们的算法所需的各种组件

  • 一个环境;

  • 一个策略(以及我们归类为“模型”的关联模块);

  • 一个数据收集器,它使策略在环境中运行并提供训练数据;

  • 一个回放缓冲区来存储训练数据;

  • 一个损失模块,用于计算我们训练策略以最大化回报的目标函数;

  • 一个优化器,用于根据我们的损失执行参数更新。

附加模块包括一个记录器、一个记录器(在“eval”模式下执行策略)和一个目标网络更新器。有了所有这些组件,很容易看出在训练脚本中可能会错放或误用某个组件。训练器就是为您协调这一切!

构建环境

首先,让我们编写一个将输出环境的辅助函数。与往常一样,“原始”环境可能过于简单而无法在实践中使用,我们需要一些数据转换才能将其输出暴露给策略。

我们将使用五个转换:

  • StepCounter 来计算每个轨迹中的步数;

  • ToTensorImage[W, H, C] 的 uint8 张量转换为形状为 [C, W, H] 的浮点张量,范围在 [0, 1] 之间;

  • RewardScaling 用于减小回报的尺度;

  • GrayScale 将我们的图像转换为灰度图;

  • Resize 将图像大小调整为 64x64 格式;

  • CatFrames 将任意数量的连续帧(N=4)沿通道维度堆叠成一个张量。这很有用,因为单个图像不包含关于 cartpole 运动的信息。需要一些关于过去观察和动作的记忆,可以通过循环神经网络或帧堆叠来实现。

  • ObservationNorm 将根据一些自定义的摘要统计信息对我们的观测值进行归一化。

实际上,我们的环境构建器有两个参数:

  • parallel:决定是否需要并行运行多个环境。我们将转换堆叠在 ParallelEnv 之后,以利用设备上操作的矢量化,尽管从技术上讲,这可以与附加了自己转换集的每个单独环境一起工作。

  • obs_norm_sd 将包含 ObservationNorm 转换的归一化常数。

def make_env(
    parallel=False,
    obs_norm_sd=None,
    num_workers=1,
):
    if obs_norm_sd is None:
        obs_norm_sd = {"standard_normal": True}
    if parallel:

        def maker():
            return GymEnv(
                "CartPole-v1",
                from_pixels=True,
                pixels_only=True,
                device=device,
            )

        base_env = ParallelEnv(
            num_workers,
            EnvCreator(maker),
            # Don't create a sub-process if we have only one worker
            serial_for_single=True,
            mp_start_method=mp_context,
        )
    else:
        base_env = GymEnv(
            "CartPole-v1",
            from_pixels=True,
            pixels_only=True,
            device=device,
        )

    env = TransformedEnv(
        base_env,
        Compose(
            StepCounter(),  # to count the steps of each trajectory
            ToTensorImage(),
            RewardScaling(loc=0.0, scale=0.1),
            GrayScale(),
            Resize(64, 64),
            CatFrames(4, in_keys=["pixels"], dim=-3),
            ObservationNorm(in_keys=["pixels"], **obs_norm_sd),
        ),
    )
    return env

计算归一化常数

为了归一化图像,我们不想使用完整的 [C, W, H] 归一化掩码独立地归一化每个像素,而是使用更简单的 [C, 1, 1] 形状的归一化常数集(位置和尺度参数)。我们将使用 init_stats()reduce_dim 参数来指示必须减少哪些维度,并使用 keep_dims 参数来确保并非所有维度都在此过程中消失。

def get_norm_stats():
    test_env = make_env()
    test_env.transform[-1].init_stats(
        num_iter=1000, cat_dim=0, reduce_dim=[-1, -2, -4], keep_dims=(-1, -2)
    )
    obs_norm_sd = test_env.transform[-1].state_dict()
    # let's check that normalizing constants have a size of ``[C, 1, 1]`` where
    # ``C=4`` (because of :class:`~torchrl.envs.CatFrames`).
    print("state dict of the observation norm:", obs_norm_sd)
    test_env.close()
    del test_env
    return obs_norm_sd

构建模型(深度 Q 网络)

以下函数构建了一个 DuelingCnnDQNet 对象,它是一个简单的 CNN,后跟一个两层 MLP。这里使用的唯一技巧是,动作值(即左和右动作值)是通过以下方式计算的:

\[\mathbb{v} = b(obs) + v(obs) - \mathbb{E}[v(obs)]\]

其中 \(\mathbb{v}\) 是我们的动作值向量,\(b\) 是一个 \(\mathbb{R}^n \rightarrow 1\) 函数,\(v\) 是一个 \(\mathbb{R}^n \rightarrow \mathbb{R}^m\) 函数,对于 \(n = \# obs\)\(m = \# actions\)

我们的网络被包装在一个 QValueActor 中,它将读取状态-动作值,选择具有最大值的那个,并将所有这些结果写入输入的 tensordict.TensorDict

def make_model(dummy_env):
    cnn_kwargs = {
        "num_cells": [32, 64, 64],
        "kernel_sizes": [6, 4, 3],
        "strides": [2, 2, 1],
        "activation_class": nn.ELU,
        # This can be used to reduce the size of the last layer of the CNN
        # "squeeze_output": True,
        # "aggregator_class": nn.AdaptiveAvgPool2d,
        # "aggregator_kwargs": {"output_size": (1, 1)},
    }
    mlp_kwargs = {
        "depth": 2,
        "num_cells": [
            64,
            64,
        ],
        "activation_class": nn.ELU,
    }
    net = DuelingCnnDQNet(
        dummy_env.action_spec.shape[-1], 1, cnn_kwargs, mlp_kwargs
    ).to(device)
    net.value[-1].bias.data.fill_(init_bias)

    actor = QValueActor(net, in_keys=["pixels"], spec=dummy_env.action_spec).to(device)
    # init actor: because the model is composed of lazy conv/linear layers,
    # we must pass a fake batch of data through it to instantiate them.
    tensordict = dummy_env.fake_tensordict()
    actor(tensordict)

    # we join our actor with an EGreedyModule for data collection
    exploration_module = EGreedyModule(
        spec=dummy_env.action_spec,
        annealing_num_steps=total_frames,
        eps_init=eps_greedy_val,
        eps_end=eps_greedy_val_env,
    )
    actor_explore = TensorDictSequential(actor, exploration_module)

    return actor, actor_explore

收集和存储数据

回放缓冲区

回放缓冲区在离策略强化学习算法(如 DQN)中起着核心作用。它们构成了我们在训练期间要从中采样的数据集。

在这里,我们将使用常规的采样策略,尽管优先回放缓冲区(prioritized RB)可以显着提高性能。

我们使用 LazyMemmapStorage 类将存储放在磁盘上。这种存储是惰性创建的:它将在第一个数据批次传递给它之后才会被实例化。

此存储的唯一要求是,在写入时传递给它的数据必须始终具有相同的形状。

buffer_scratch_dir = tempfile.TemporaryDirectory().name


def get_replay_buffer(buffer_size, n_optim, batch_size, device):
    replay_buffer = TensorDictReplayBuffer(
        batch_size=batch_size,
        storage=LazyMemmapStorage(buffer_size, scratch_dir=buffer_scratch_dir),
        prefetch=n_optim,
        transform=lambda td: td.to(device),
    )
    return replay_buffer

数据收集器 (Data collector)

与 PPO 和 DDPG 中一样,我们将使用数据收集器作为外层循环中的数据加载器。

我们选择以下配置:我们将在一系列并行环境中同步运行,这些环境在不同的收集器中并行运行,而收集器本身并行但异步运行。

注意

此功能仅在 Python 多进程库的“spawn”启动方法中运行代码时可用。如果此教程直接作为脚本运行(从而使用“fork”方法),我们将使用常规的 SyncDataCollector

此配置的优点是我们可以平衡批量执行的计算量与我们希望异步执行的计算量。我们鼓励读者尝试通过修改收集器数量(即传递给收集器的环境构造函数数量)和每个收集器中并行运行的环境数量(由 num_workers 超参数控制)来影响收集速度。

收集器的设备可以完全通过 device(通用)、policy_deviceenv_devicestoring_device 参数进行参数化。storing_device 参数将修改收集数据的设备:如果正在收集的批次具有可观的大小,我们可能希望将它们存储在不同于计算正在发生的设备的位置。对于异步数据收集器(如我们的),不同的存储设备意味着我们收集的数据不会每次都位于同一设备上,这是我们的训练循环必须考虑到的。为简单起见,我们将所有子收集器的设备设置为相同的值。

def get_collector(
    stats,
    num_collectors,
    actor_explore,
    frames_per_batch,
    total_frames,
    device,
):
    # We can't use nested child processes with mp_start_method="fork"
    if is_fork:
        cls = SyncDataCollector
        env_arg = make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
    else:
        cls = MultiaSyncDataCollector
        env_arg = [
            make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
        ] * num_collectors
    data_collector = cls(
        env_arg,
        policy=actor_explore,
        frames_per_batch=frames_per_batch,
        total_frames=total_frames,
        # this is the default behavior: the collector runs in ``"random"`` (or explorative) mode
        exploration_type=ExplorationType.RANDOM,
        # We set the all the devices to be identical. Below is an example of
        # heterogeneous devices
        device=device,
        storing_device=device,
        split_trajs=False,
        postproc=MultiStep(gamma=gamma, n_steps=5),
    )
    return data_collector

损失函数 (Loss function)

构建我们的损失函数很简单:我们只需要为 DQNLoss 类提供模型和一些超参数。

目标参数

许多离策略强化学习算法在估计下一个状态或状态-动作对的值时使用“目标参数”的概念。目标参数是模型参数的滞后副本。由于它们的预测与当前模型配置的预测不匹配,它们通过对正在估计的值施加一个悲观的界限来帮助学习。这是一个强大的技巧(称为“双 Q-Learning”),在类似算法中无处不在。

def get_loss_module(actor, gamma):
    loss_module = DQNLoss(actor, delay_value=True)
    loss_module.make_value_estimator(gamma=gamma)
    target_updater = SoftUpdate(loss_module, eps=0.995)
    return loss_module, target_updater

超参数

让我们从我们的超参数开始。以下设置在实践中应该效果良好,并且算法的性能应该对这些参数的轻微变化不太敏感。

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

优化器

# the learning rate of the optimizer
lr = 2e-3
# weight decay
wd = 1e-5
# the beta parameters of Adam
betas = (0.9, 0.999)
# Optimization steps per batch collected (aka UPD or updates per data)
n_optim = 8

DQN 参数

gamma 衰减因子

gamma = 0.99

平滑目标网络更新的衰减参数。这大致对应于带有硬目标网络更新的 1/tau 间隔

tau = 0.02

数据收集和回放缓冲区

注意

用于正确训练的值已注释掉。

在环境中收集的总帧数。在其他实现中,用户定义了最大集数。由于我们的数据收集器返回 N 帧的批次,其中 N 是一个常数,因此这更难做到。但是,可以通过在收集了特定数量的集后中断训练循环来轻松实现对集数量的相同限制。

total_frames = 5_000  # 500000

用于初始化回放缓冲区的随机帧。

init_random_frames = 100  # 1000

每次收集的批次中的帧数。

frames_per_batch = 32  # 128

每次优化步骤从回放缓冲区中采样的帧数

batch_size = 32  # 256

回放缓冲区的尺寸(以帧为单位)

buffer_size = min(total_frames, 100000)

每个数据收集器中并行运行的环境数量

num_workers = 2  # 8
num_collectors = 2  # 4

环境和探索

我们设置了 Epsilon-greedy 探索中 epsilon 因子(epsilon factor)的初始值和最终值。由于我们的策略是确定性的,探索至关重要:没有它,唯一的随机性将来自环境重置。

eps_greedy_val = 0.1
eps_greedy_val_env = 0.005

为了加速学习,我们将值网络的最后一层的偏置设置为一个预定义的值(这不是强制性的)

init_bias = 2.0

注意

为了快速渲染教程,total_frames 超参数被设置为一个非常低的值。为了获得合理的性能,请使用更大的值,例如 500000。

构建训练器

TorchRL 的 Trainer 类构造函数接受以下仅关键字参数:

  • collector

  • loss_module

  • optimizer

  • logger:一个记录器可以是

  • total_frames:此参数定义了训练器的生命周期。

  • frame_skip:当使用帧跳跃时,收集器必须知道它才能准确计算收集的帧数等。让训练器知道此参数不是必需的,但有助于在总帧数(预算)固定但帧跳跃可变的情况下进行更公平的比较。

stats = get_norm_stats()
test_env = make_env(parallel=False, obs_norm_sd=stats)
# Get model
actor, actor_explore = make_model(test_env)
loss_module, target_net_updater = get_loss_module(actor, gamma)

collector = get_collector(
    stats=stats,
    num_collectors=num_collectors,
    actor_explore=actor_explore,
    frames_per_batch=frames_per_batch,
    total_frames=total_frames,
    device=device,
)
optimizer = torch.optim.Adam(
    loss_module.parameters(), lr=lr, weight_decay=wd, betas=betas
)
exp_name = f"dqn_exp_{uuid.uuid1()}"
tmpdir = tempfile.TemporaryDirectory()
logger = CSVLogger(exp_name=exp_name, log_dir=tmpdir.name)
warnings.warn(f"log dir: {logger.experiment.log_dir}")

我们可以控制标量记录的频率。在这里,我们将它设置为一个较低的值,因为我们的训练循环很短。

log_interval = 500

trainer = Trainer(
    collector=collector,
    total_frames=total_frames,
    frame_skip=1,
    loss_module=loss_module,
    optimizer=optimizer,
    logger=logger,
    optim_steps_per_batch=n_optim,
    log_interval=log_interval,
)

注册钩子

可以通过两种方式注册钩子:

  • 如果钩子具有该功能,则 register() 方法是首选。只需提供训练器作为输入,钩子就会以默认名称在默认位置注册。对于某些钩子,注册可能非常复杂:ReplayBufferTrainer 需要 3 个钩子(extendsampleupdate_priority),这可能很难实现。

buffer_hook = ReplayBufferTrainer(
    get_replay_buffer(buffer_size, n_optim, batch_size=batch_size, device=device),
    flatten_tensordicts=True,
)
buffer_hook.register(trainer)
weight_updater = UpdateWeights(collector, update_weights_interval=1)
weight_updater.register(trainer)
recorder = LogValidationReward(
    record_interval=100,  # log every 100 optimization steps
    record_frames=1000,  # maximum number of frames in the record
    frame_skip=1,
    policy_exploration=actor_explore,
    environment=test_env,
    exploration_type=ExplorationType.DETERMINISTIC,
    log_keys=[("next", "reward")],
    out_keys={("next", "reward"): "rewards"},
    log_pbar=True,
)
recorder.register(trainer)

探索模块的 epsilon 因子也会被退火。

trainer.register_op("post_steps", actor_explore[1].step, frames=frames_per_batch)
  • 可以使用 register_op() 注册任何可调用对象(包括 TrainerHookBase 的子类)。在这种情况下,必须显式传递一个位置()。此方法为钩子的位置提供了更多控制,但也需要对训练器机制有更多的了解。请查看 训练器文档 以详细了解训练器钩子。

trainer.register_op("post_optim", target_net_updater.step)

我们还可以记录训练奖励。请注意,这对于 CartPole 的意义有限,因为奖励始终为 1。折扣回报总和最大化的方法不是获得更高的奖励,而是让 cart-pole 存活更长时间。这将在进度条中显示的 total_rewards 值中反映出来。

log_reward = LogScalar(log_pbar=True)
log_reward.register(trainer)

注意

如果需要,可以将多个优化器链接到训练器。在这种情况下,每个优化器将绑定到损失字典中的一个字段。要了解更多信息,请查看 OptimizerHook

我们现在准备训练我们的算法!只需调用 trainer.train(),我们就会将结果记录下来。

trainer.train()

现在我们可以快速查看带有结果的 CSV 文件。

def print_csv_files_in_folder(folder_path):
    """
    Find all CSV files in a folder and prints the first 10 lines of each file.

    Args:
        folder_path (str): The relative path to the folder.

    """
    csv_files = []
    output_str = ""
    for dirpath, _, filenames in os.walk(folder_path):
        for file in filenames:
            if file.endswith(".csv"):
                csv_files.append(os.path.join(dirpath, file))
    for csv_file in csv_files:
        output_str += f"File: {csv_file}\n"
        with open(csv_file) as f:
            for i, line in enumerate(f):
                if i == 10:
                    break
                output_str += line.strip() + "\n"
        output_str += "\n"
    print(output_str)


print_csv_files_in_folder(logger.experiment.log_dir)

trainer.shutdown()
del trainer

结论和可能的改进

在本教程中,我们学习了:

  • 如何编写训练器,包括构建其组件并将它们注册到训练器中;

  • 如何编写 DQN 算法,包括如何使用 QValueNetwork 创建选择具有最高值的动作的策略;

  • 如何构建多进程数据收集器;

本教程可能的改进包括:

  • 还可以使用优先回放缓冲区。这将为价值准确性较差的样本提供更高的优先级。在文档的 回放缓冲区部分 中了解更多信息。

  • 分布损失(有关更多信息,请参阅 DistributionalDQNLoss)。

  • 更花哨的探索技术,例如 NoisyLinear 层等。

由 Sphinx-Gallery 生成的画廊

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源