• 文档 >
  • TorchRL 训练器:一个 DQN 示例
快捷方式

TorchRL 训练器:DQN 示例

作者Vincent Moens

TorchRL 提供了一个通用的 Trainer 类来处理您的训练循环。训练器执行一个嵌套循环,其中外层循环是数据收集,内层循环消耗这些数据或从回放缓冲区检索的数据来训练模型。在训练循环的各个点,可以附加钩子并在给定时间间隔执行。

在本教程中,我们将使用训练器类从头开始训练一个 DQN 算法来解决 CartPole 任务。

主要收获

  • 构建一个包含其基本组件的训练器:数据收集器、损失模块、回放缓冲区和优化器。

  • 向训练器添加钩子,例如日志记录器、目标网络更新器等。

训练器是完全可定制的,并提供大量功能。本教程围绕其构建进行组织。我们将首先详细介绍如何构建库的每个组件,然后使用 Trainer 类将这些组件组合在一起。

在此过程中,我们还将关注该库的一些其他方面

  • 如何在 TorchRL 中构建环境,包括变换(例如,数据归一化、帧堆叠、调整大小和灰度化)以及并行执行。与我们在 DDPG 教程 中所做的不同,我们将归一化像素而不是状态向量。

  • 如何设计一个 QValueActor 对象,即一个估计动作值并选择估计回报最高的动作的 actor;

  • 如何从环境中高效收集数据并将其存储在回放缓冲区中;

  • 如何使用多步,这是用于离策略算法的一个简单预处理步骤;

  • 最后,如何评估您的模型。

先决条件:我们鼓励您首先通过 PPO 教程 来熟悉 torchrl。

DQN

DQN(深度 Q-Learning)是深度强化学习的开创性工作。

从高层次来看,该算法非常简单:Q-Learning 包括学习一个状态-动作值表,以便在遇到任何特定状态时,我们只需查找值最高的动作即可知道应选择哪个动作。这种简单的设置要求动作和状态是离散的,否则无法构建查找表。

DQN 使用一个神经网络,该网络将状态-动作空间映射到值(标量)空间,从而分摊存储和探索所有可能的状态-动作组合的成本:如果过去未见过某个状态,我们仍然可以将其与通过神经网络的各种可用动作结合起来,并获得每个可用动作的插值。值。

我们将解决经典的 CartPole 控制问题。摘自该环境检索的 Gymnasium 文档

一根杆子通过一个非驱动关节连接到一个在
无摩擦轨道上移动的推车上。摆锤垂直放置在推车上,目标
通过在推车上施加左右方向的力来平衡杆子。
Cart Pole

我们不旨在提供该算法的 SOTA 实现,而是为了在算法的上下文中提供 TorchRL 功能的高级说明。

import os
import uuid

import torch
from torch import nn
from torchrl.collectors import MultiaSyncDataCollector, SyncDataCollector
from torchrl.data import LazyMemmapStorage, MultiStep, TensorDictReplayBuffer
from torchrl.envs import (
    EnvCreator,
    ExplorationType,
    ParallelEnv,
    RewardScaling,
    StepCounter,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
    CatFrames,
    Compose,
    GrayScale,
    ObservationNorm,
    Resize,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.modules import DuelingCnnDQNet, EGreedyModule, QValueActor

from torchrl.objectives import DQNLoss, SoftUpdate
from torchrl.record.loggers.csv import CSVLogger
from torchrl.trainers import (
    LogScalar,
    LogValidationReward,
    ReplayBufferTrainer,
    Trainer,
    UpdateWeights,
)


def is_notebook() -> bool:
    try:
        shell = get_ipython().__class__.__name__
        if shell == "ZMQInteractiveShell":
            return True  # Jupyter notebook or qtconsole
        elif shell == "TerminalInteractiveShell":
            return False  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False  # Probably standard Python interpreter

让我们开始处理我们算法所需的各种组件

  • 一个环境;

  • 一个策略(以及我们将其分组在“模型”下的相关模块);

  • 一个数据收集器,它使策略在环境中运行并提供训练数据;

  • 一个回放缓冲区来存储训练数据;

  • 一个损失模块,用于计算策略最大化回报的目标函数;

  • 一个优化器,它根据我们的损失执行参数更新。

附加模块包括日志记录器、记录器(以“eval”模式执行策略)和目标网络更新器。有了所有这些组件,很容易看出在训练脚本中可能会放错位置或误用某个组件。训练器就是为您协调一切!

构建环境

首先,让我们编写一个将输出环境的辅助函数。照常,“原始”环境可能过于简单,无法在实践中使用,我们需要一些数据转换来将其输出暴露给策略。

我们将使用五个变换

  • StepCounter 用于计算每个轨迹中的步数;

  • ToTensorImage 将把 [W, H, C] 的 uint8 张量转换为 [0, 1] 空间中的浮点张量,形状为 [C, W, H]

  • RewardScaling 用于减小回报的尺度;

  • GrayScale 将把我们的图像转换为灰度;

  • Resize 将把图像调整为 64x64 格式;

  • CatFrames 将把任意数量的连续帧(N=4)沿通道维度连接到单个张量中。这很有用,因为单个图像不包含有关 CartPole 运动的信息。需要一些关于过去观察和动作的记忆,通过循环神经网络或使用帧堆叠。

  • ObservationNorm,它将根据一些自定义摘要统计信息对我们的观察进行归一化。

实际上,我们的环境构建器有两个参数

  • parallel:确定是否需要并行运行多个环境。我们将变换堆叠在 ParallelEnv 之后,以利用设备上操作的向量化,尽管这在技术上也可以与附加到其自身变换集中的每个单独环境一起工作。

  • obs_norm_sd 将包含 ObservationNorm 变换的归一化常数。

def make_env(
    parallel=False,
    obs_norm_sd=None,
    num_workers=1,
):
    if obs_norm_sd is None:
        obs_norm_sd = {"standard_normal": True}
    if parallel:

        def maker():
            return GymEnv(
                "CartPole-v1",
                from_pixels=True,
                pixels_only=True,
                device=device,
            )

        base_env = ParallelEnv(
            num_workers,
            EnvCreator(maker),
            # Don't create a sub-process if we have only one worker
            serial_for_single=True,
            mp_start_method=mp_context,
        )
    else:
        base_env = GymEnv(
            "CartPole-v1",
            from_pixels=True,
            pixels_only=True,
            device=device,
        )

    env = TransformedEnv(
        base_env,
        Compose(
            StepCounter(),  # to count the steps of each trajectory
            ToTensorImage(),
            RewardScaling(loc=0.0, scale=0.1),
            GrayScale(),
            Resize(64, 64),
            CatFrames(4, in_keys=["pixels"], dim=-3),
            ObservationNorm(in_keys=["pixels"], **obs_norm_sd),
        ),
    )
    return env

计算归一化常数

要归一化图像,我们不希望使用完整的 [C, W, H] 归一化掩码独立地归一化每个像素,而是使用更简单的 [C, 1, 1] 形状的归一化常数(位置和尺度参数)集。我们将使用 init_stats()reduce_dim 参数来指示必须约简的维度,以及 keep_dims 参数以确保并非所有维度都在过程中消失

def get_norm_stats():
    test_env = make_env()
    test_env.transform[-1].init_stats(
        num_iter=1000, cat_dim=0, reduce_dim=[-1, -2, -4], keep_dims=(-1, -2)
    )
    obs_norm_sd = test_env.transform[-1].state_dict()
    # let's check that normalizing constants have a size of ``[C, 1, 1]`` where
    # ``C=4`` (because of :class:`~torchrl.envs.CatFrames`).
    print("state dict of the observation norm:", obs_norm_sd)
    test_env.close()
    del test_env
    return obs_norm_sd

构建模型(深度 Q 网络)

以下函数构建了一个 DuelingCnnDQNet 对象,它是一个简单的 CNN,后跟一个两层 MLP。这里使用的唯一技巧是,动作值(即左和右动作值)是使用以下公式计算的

\[\mathbb{v} = b(obs) + v(obs) - \mathbb{E}[v(obs)]\]

其中 \(\mathbb{v}\) 是我们的动作值向量,\(b\) 是一个 \(\mathbb{R}^n \rightarrow 1\) 函数,\(v\) 是一个 \(\mathbb{R}^n \rightarrow \mathbb{R}^m\) 函数,对于 \(n = \# obs\)\(m = \# actions\)

我们的网络被包装在一个 QValueActor 中,它将读取状态-动作值,选择具有最大值的那个,并将所有这些结果写入输入的 tensordict.TensorDict

def make_model(dummy_env):
    cnn_kwargs = {
        "num_cells": [32, 64, 64],
        "kernel_sizes": [6, 4, 3],
        "strides": [2, 2, 1],
        "activation_class": nn.ELU,
        # This can be used to reduce the size of the last layer of the CNN
        # "squeeze_output": True,
        # "aggregator_class": nn.AdaptiveAvgPool2d,
        # "aggregator_kwargs": {"output_size": (1, 1)},
    }
    mlp_kwargs = {
        "depth": 2,
        "num_cells": [
            64,
            64,
        ],
        "activation_class": nn.ELU,
    }
    net = DuelingCnnDQNet(
        dummy_env.action_spec.shape[-1], 1, cnn_kwargs, mlp_kwargs
    ).to(device)
    net.value[-1].bias.data.fill_(init_bias)

    actor = QValueActor(net, in_keys=["pixels"], spec=dummy_env.action_spec).to(device)
    # init actor: because the model is composed of lazy conv/linear layers,
    # we must pass a fake batch of data through it to instantiate them.
    tensordict = dummy_env.fake_tensordict()
    actor(tensordict)

    # we join our actor with an EGreedyModule for data collection
    exploration_module = EGreedyModule(
        spec=dummy_env.action_spec,
        annealing_num_steps=total_frames,
        eps_init=eps_greedy_val,
        eps_end=eps_greedy_val_env,
    )
    actor_explore = TensorDictSequential(actor, exploration_module)

    return actor, actor_explore

收集和存储数据

回放缓冲区

回放缓冲区在 DQN 等离策略 RL 算法中起着核心作用。它们构成了训练期间我们将从中采样的数据集。

在这里,我们将使用常规采样策略,尽管优先回放缓冲区(prioritized RB)可以显著提高性能。

我们使用 LazyMemmapStorage 类将存储放在磁盘上。此存储以惰性方式创建:它将在第一个数据批次传递给它后才会被实例化。

此存储的唯一要求是,在写入时传递给它的数据必须始终具有相同的形状。

buffer_scratch_dir = tempfile.TemporaryDirectory().name


def get_replay_buffer(buffer_size, n_optim, batch_size, device):
    replay_buffer = TensorDictReplayBuffer(
        batch_size=batch_size,
        storage=LazyMemmapStorage(buffer_size, scratch_dir=buffer_scratch_dir),
        prefetch=n_optim,
        transform=lambda td: td.to(device),
    )
    return replay_buffer

数据收集器 (Data collector)

与 PPO 和 DDPG 一样,我们将使用数据收集器作为外层循环中的数据加载器。

我们选择以下配置:我们将在一系列并行环境中同步并行运行,这些并行环境位于不同的收集器中,而这些收集器本身并行但异步运行。

注意

此功能仅在 Python 多进程库的“spawn”启动方法中运行代码时可用。如果此教程直接作为脚本运行(从而使用“fork”方法),我们将使用常规的 SyncDataCollector

此配置的优点是我们可以在批量执行的计算量与我们希望异步执行的计算量之间取得平衡。我们鼓励读者通过修改收集器数量(即传递给收集器的环境构造函数数量)以及每个收集器中并行执行的环境数量(由 num_workers 超参数控制)来实验收集速度受到的影响。

收集器的设备可以通过 device(通用)、policy_deviceenv_devicestoring_device 参数完全参数化。storing_device 参数将修改正在收集的数据的位置:如果正在收集的批次大小相当可观,我们可能希望将它们存储在与计算发生位置不同的位置。对于我们的异步数据收集器,不同的存储设备意味着我们收集的数据不会每次都位于同一设备上,这是我们的训练循环必须考虑到的。为简单起见,我们将所有子收集器的设备设置为相同的值。

def get_collector(
    stats,
    num_collectors,
    actor_explore,
    frames_per_batch,
    total_frames,
    device,
):
    # We can't use nested child processes with mp_start_method="fork"
    if is_fork:
        cls = SyncDataCollector
        env_arg = make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
    else:
        cls = MultiaSyncDataCollector
        env_arg = [
            make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
        ] * num_collectors
    data_collector = cls(
        env_arg,
        policy=actor_explore,
        frames_per_batch=frames_per_batch,
        total_frames=total_frames,
        # this is the default behavior: the collector runs in ``"random"`` (or explorative) mode
        exploration_type=ExplorationType.RANDOM,
        # We set the all the devices to be identical. Below is an example of
        # heterogeneous devices
        device=device,
        storing_device=device,
        split_trajs=False,
        postproc=MultiStep(gamma=gamma, n_steps=5),
    )
    return data_collector

损失函数 (Loss function)

构建我们的损失函数很简单:我们只需要向 DQNLoss 类提供模型和一些超参数。

目标参数

许多离策略 RL 算法在使用“目标参数”来估计下一个状态或状态-动作对的值时会使用该概念。目标参数是模型参数的滞后副本。因为它们的预测与当前模型配置不匹配,它们通过对正在估计的值设置悲观界限来帮助学习。这是一个强大的技巧(称为“双 Q-Learning”),在类似算法中无处不在。

def get_loss_module(actor, gamma):
    loss_module = DQNLoss(actor, delay_value=True)
    loss_module.make_value_estimator(gamma=gamma)
    target_updater = SoftUpdate(loss_module, eps=0.995)
    return loss_module, target_updater

超参数

让我们从超参数开始。以下设置在实践中应该效果很好,并且算法的性能应该不太对这些参数的微小变化敏感。

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

优化器

# the learning rate of the optimizer
lr = 2e-3
# weight decay
wd = 1e-5
# the beta parameters of Adam
betas = (0.9, 0.999)
# Optimization steps per batch collected (aka UPD or updates per data)
n_optim = 8

DQN 参数

gamma 衰减因子

gamma = 0.99

平滑目标网络更新衰减参数。这大致相当于具有硬目标网络更新的 1/tau 间隔

tau = 0.02

数据收集和回放缓冲区

注意

已注释掉用于正确训练的值。

在环境中收集的总帧数。在其他实现中,用户定义了最大回合数。这在使用返回 N 帧批次的 Our data collectors 时更难做到,其中 N 是一个常数。但是,可以通过在收集了一定数量的回合后中断训练循环来轻松获得相同数量的回合限制。

total_frames = 5_000  # 500000

用于初始化回放缓冲区的随机帧。

init_random_frames = 100  # 1000

每个收集批次中的帧。

frames_per_batch = 32  # 128

每次优化步骤从回放缓冲区采样

batch_size = 32  # 256

回放缓冲区的规模(以帧为单位)

buffer_size = min(total_frames, 100000)

每个数据收集器中并行运行的环境数量

num_workers = 2  # 8
num_collectors = 2  # 4

环境和探索

我们设置了 epsilon 贪婪探索中 epsilon 因子的初始值和最终值。由于我们的策略是确定性的,探索至关重要:没有它,随机性的唯一来源将是环境重置。

eps_greedy_val = 0.1
eps_greedy_val_env = 0.005

为了加速学习,我们将值网络的最后一层的偏差设置为预定义值(这不是强制性的)

init_bias = 2.0

注意

为了快速渲染本教程,total_frames 超参数被设置为非常低的值。要获得合理的性能,请使用更大的值,例如 500000。

构建训练器

TorchRL 的 Trainer 类构造函数接受以下仅关键字参数

  • collector

  • loss_module

  • optimizer

  • logger:Logger 可以是

  • total_frames:此参数定义了训练器的生命周期。

  • frame_skip:当使用帧跳时,收集器必须知道它,以便准确计算收集的帧数等。让训练器知道此参数不是强制性的,但有助于在总帧数(预算)固定但帧跳可变的情况下进行更公平的比较。

stats = get_norm_stats()
test_env = make_env(parallel=False, obs_norm_sd=stats)
# Get model
actor, actor_explore = make_model(test_env)
loss_module, target_net_updater = get_loss_module(actor, gamma)

collector = get_collector(
    stats=stats,
    num_collectors=num_collectors,
    actor_explore=actor_explore,
    frames_per_batch=frames_per_batch,
    total_frames=total_frames,
    device=device,
)
optimizer = torch.optim.Adam(
    loss_module.parameters(), lr=lr, weight_decay=wd, betas=betas
)
exp_name = f"dqn_exp_{uuid.uuid1()}"
tmpdir = tempfile.TemporaryDirectory()
logger = CSVLogger(exp_name=exp_name, log_dir=tmpdir.name)
warnings.warn(f"log dir: {logger.experiment.log_dir}")

我们可以控制标量多久记录一次。这里我们将其设置为一个较低的值,因为我们的训练循环很短

log_interval = 500

trainer = Trainer(
    collector=collector,
    total_frames=total_frames,
    frame_skip=1,
    loss_module=loss_module,
    optimizer=optimizer,
    logger=logger,
    optim_steps_per_batch=n_optim,
    log_interval=log_interval,
)

注册钩子

可以通过两种独立的方式注册钩子

  • 如果钩子有它,register() 方法是首选。只需提供训练器作为输入,钩子将以默认名称在默认位置注册。对于某些钩子,注册可能非常复杂:ReplayBufferTrainer 需要 3 个钩子(extendsampleupdate_priority),这可能很麻烦。

buffer_hook = ReplayBufferTrainer(
    get_replay_buffer(buffer_size, n_optim, batch_size=batch_size, device=device),
    flatten_tensordicts=True,
)
buffer_hook.register(trainer)
weight_updater = UpdateWeights(collector, update_weights_interval=1)
weight_updater.register(trainer)
recorder = LogValidationReward(
    record_interval=100,  # log every 100 optimization steps
    record_frames=1000,  # maximum number of frames in the record
    frame_skip=1,
    policy_exploration=actor_explore,
    environment=test_env,
    exploration_type=ExplorationType.DETERMINISTIC,
    log_keys=[("next", "reward")],
    out_keys={("next", "reward"): "rewards"},
    log_pbar=True,
)
recorder.register(trainer)

探索模块 epsilon 因子也会衰减

trainer.register_op("post_steps", actor_explore[1].step, frames=frames_per_batch)
  • 任何可调用对象(包括 TrainerHookBase 子类)都可以使用 register_op() 进行注册。在这种情况下,必须显式传递位置()。此方法提供了对钩子位置的更多控制,但也需要对训练器机制有更多的了解。请参阅 训练器文档 以获取训练器钩子的详细说明。

trainer.register_op("post_optim", target_net_updater.step)

我们也可以记录训练奖励。请注意,对于 CartPole 来说,这兴趣有限,因为奖励总是 1。折扣奖励总和的优化不是通过获得更高的奖励,而是通过让 CartPole 存活更长时间来实现的。这将在进度条中显示的 total_rewards 值中得到反映。

log_reward = LogScalar(log_pbar=True)
log_reward.register(trainer)

注意

如果需要,可以将多个优化器链接到训练器。在这种情况下,每个优化器都将绑定到损失字典中的一个字段。有关更多信息,请参阅 OptimizerHook

我们准备好训练我们的算法了!只需调用 trainer.train(),我们就会在日志中看到我们的结果。

trainer.train()

我们现在可以快速查看包含结果的 CSV 文件。

def print_csv_files_in_folder(folder_path):
    """
    Find all CSV files in a folder and prints the first 10 lines of each file.

    Args:
        folder_path (str): The relative path to the folder.

    """
    csv_files = []
    output_str = ""
    for dirpath, _, filenames in os.walk(folder_path):
        for file in filenames:
            if file.endswith(".csv"):
                csv_files.append(os.path.join(dirpath, file))
    for csv_file in csv_files:
        output_str += f"File: {csv_file}\n"
        with open(csv_file) as f:
            for i, line in enumerate(f):
                if i == 10:
                    break
                output_str += line.strip() + "\n"
        output_str += "\n"
    print(output_str)


print_csv_files_in_folder(logger.experiment.log_dir)

trainer.shutdown()
del trainer

结论和可能的改进

在本教程中,我们学习了

  • 如何编写一个训练器,包括构建其组件并将它们注册到训练器中;

  • 如何编写一个 DQN 算法,包括如何创建具有 QValueNetwork 的策略来选择具有最高值的动作;

  • 如何构建一个多进程数据收集器;

本教程的可能改进包括

  • 也可以使用优先回放缓冲区。这将为具有最差值精度的样本提供更高的优先级。在文档的 回放缓冲区部分 中了解更多信息。

  • 分布损失(有关更多信息,请参阅 DistributionalDQNLoss)。

  • 更高级的探索技术,例如 NoisyLinear 层等。

由 Sphinx-Gallery 生成的画廊

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源