评价此页

TorchRL 目标:编写 DDPG 损失函数#

创建日期:2023年8月14日 | 最后更新:2025年3月20日 | 最后验证:未验证

作者Vincent Moens

概述#

TorchRL 将强化学习算法的训练拆分为多个部分,并在你的训练脚本中进行组装:环境、数据收集与存储、模型以及最终的损失函数。

TorchRL 的损失函数(或称“目标函数”)是有状态对象,其中包含可训练参数(策略模型和价值模型)。本教程将指导你如何从零开始使用 TorchRL 编写损失函数。

为此,我们将重点介绍 DDPG,这是一种相对简单的算法。 深度确定性策略梯度 (DDPG) 是一种简单的连续控制算法。它包含学习动作-观测对的参数化价值函数,然后学习一个策略,该策略输出能在给定观测下使该价值函数最大化的动作。

您将学到什么

  • 如何编写损失模块并自定义其价值估计器;

  • 如何在 TorchRL 中构建环境,包括转换(例如,数据归一化)和并行执行;

  • 如何设计策略网络和价值网络;

  • 如何从环境中高效收集数据并将其存储在回放缓冲区中;

  • 如何在重放缓冲区中存储轨迹(而非单步转换);

  • 如何评估模型。

先决条件#

本教程假设你已完成 PPO 教程,该教程概述了 TorchRL 的组件和依赖项,例如 tensordict.TensorDicttensordict.nn.TensorDictModules,尽管即使没有深入了解这些类,教程内容也应当足够清晰易懂。

注意

我们的目的不是提供该算法的 SOTA(最先进)实现,而是为了提供 TorchRL 损失函数实现的高层说明,以及在该算法上下文中需要使用的库特性。

导入与设置#

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果 CUDA 可用,我们将在 CUDA 上执行策略

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule#

TorchRL 提供了一系列可在训练脚本中使用的损失。目的是让损失易于重用/互换,并且具有简单的签名。

TorchRL 损失的主要特点是

  • 它们是有状态对象:包含可训练参数的副本,因此 loss_module.parameters() 可以提供训练算法所需的一切。

  • 它们遵循 TensorDict 约定:torch.nn.Module.forward() 方法将接收一个 TensorDict 作为输入,其中包含返回损失值所需的所有信息。

    >>> data = replay_buffer.sample()
    >>> loss_dict = loss_module(data)
    
  • 它们输出一个 tensordict.TensorDict 实例,损失值写入在 "loss_<smth>" 键下,其中 smth 是描述损失的字符串。TensorDict 中的额外键可能是训练期间记录的有用指标。

    注意

    我们返回独立损失的原因是让用户可以为不同的参数集使用不同的优化器。损失的求和可以通过以下方式简单完成

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

__init__ 方法#

所有损失函数的父类都是 LossModule。与库中的许多其他组件一样,其 forward() 方法期望的输入是采样自经验回放缓冲区或类似数据结构的 tensordict.TensorDict 实例。使用这种格式使得该模块可以在不同模式下重用,或者在模型需要读取多个条目等复杂设置中重用。换句话说,它允许我们编写一个不依赖于输入数据类型、只关注运行损失函数基本步骤的损失模块。

为了保持教程的教学性,我们将独立展示该类的每个方法,并在后续阶段填充该类。

让我们从 __init__() 方法开始。DDPG 旨在通过一个简单的策略解决控制任务:训练策略输出能够使价值网络预测值最大化的动作。因此,我们的损失模块需要在构造函数中接收两个网络:演员网络(Actor)和价值网络(Value)。我们期望两者都是 TensorDict 兼容对象,例如 tensordict.nn.TensorDictModule。我们的损失函数将需要计算目标值并以此拟合价值网络,同时生成动作并拟合策略,使其价值估计最大化。

LossModule.__init__() 方法的关键步骤是对 convert_to_functional() 的调用。此方法将从模块中提取参数并将其转换为函数式模块。严格来说,这不是必须的,人们完全可以在不使用它的情况下编写所有损失函数。然而,我们鼓励使用它,原因如下。

TorchRL 这样做的原因是,强化学习算法经常使用不同的参数集(即“可训练”参数和“目标”参数)来执行同一个模型。“可训练”参数是优化器需要拟合的参数。“目标”参数通常是前者的副本,并带有一定的时间滞后(绝对时间或通过移动平均延迟)。这些目标参数用于计算与下一个观测相关的价值。为价值模型使用与当前配置不完全匹配的一组目标参数的优点之一是,它们为正在计算的价值函数提供了一个悲观边界。请注意下方的 create_target_params 关键字参数:该参数告诉 convert_to_functional() 方法在损失模块中创建一组目标参数,用于计算目标值。如果将其设置为 False(例如演员网络),则仍可以访问 target_actor_network_params 属性,但它只会返回演员参数的分离(detached)版本。

稍后,我们将看到如何在 TorchRL 中更新目标参数。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

价值估计器损失方法#

在许多强化学习算法中,价值网络(或 Q 值网络)是基于经验价值估计进行训练的。这可以是自举的(TD(0),低方差,高偏差),这意味着目标值仅使用下一个奖励获得,也可以是蒙特卡洛估计(TD(1)),在这种情况下,将使用所有后续奖励序列(高方差,低偏差)。也可以使用中间估计器(TD(\(\lambda\)))来折中偏差和方差。TorchRL 通过 ValueEstimators 枚举类使使用其中一种估计器变得容易,该类包含了所有已实现价值估计器的指针。让我们在这里定义默认价值函数。我们将使用最简单的版本(TD(0)),并稍后展示如何更改它。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我们还需要根据用户查询,向 DDPG 提供有关如何构建价值估计器的说明。根据提供的估计器,我们将构建相应的模块以在训练时使用。

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

make_value_estimator 方法可以被调用,但并非必须:如果不调用,LossModule 将使用其默认估计器查询此方法。

演员损失方法#

强化学习算法的核心是演员的训练损失。对于 DDPG,此函数非常简单:我们只需要计算使用策略得出的动作所对应的价值,并优化演员权重以最大化该价值。

计算此价值时,必须确保将价值参数从图中分离出来,否则演员和价值损失会混在一起。为此,可以使用 hold_out_params() 函数。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

价值损失方法#

现在我们需要优化价值网络参数。为此,我们将依赖类中的价值估计器。

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

在 forward 调用中整合各部分#

唯一缺少的是 forward 方法,它将整合价值和演员损失,收集成本值并将它们写入交付给用户的 TensorDict 中。

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

既然有了损失函数,我们就可以用它来训练策略以解决控制任务。

环境#

在大多数算法中,首先需要处理的是环境的构建,因为它决定了其余训练脚本的逻辑。

在本例中,我们将使用 "cheetah" 任务。目标是让“猎豹”机器人尽可能跑得快。

在 TorchRL 中,可以通过依赖 dm_controlgym 来创建此类任务。

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

默认情况下,这些环境禁用渲染。从状态进行训练通常比从图像进行训练更容易。为简单起见,我们仅专注于从状态学习。要将像素传递给由 env.step() 收集的 tensordicts,只需将 from_pixels=True 参数传递给构造函数即可。

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我们编写一个 make_env() 辅助函数,它将使用上述两种后端(dm-controlgym)之一创建环境。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

转换#

现在有了基本环境,我们可能希望修改其表示以使其对策略更友好。在 TorchRL 中,转换(transforms)被添加到专门的 torchr.envs.TransformedEnv 类中。

  • 在 DDPG 中,使用某种启发式数值来缩放奖励是很常见的。在本例中,我们将奖励乘以 5。

  • 如果使用 dm_control,建立仿真器(使用双精度数)与我们的脚本(通常使用单精度数)之间的接口也很重要。这种转换是双向的:调用 env.step() 时,我们的动作需要用双精度表示,而输出则需要转换为单精度。DoubleToFloat 转换正是完成了这一工作:in_keys 列表指向需要从双精度转换为浮点数的键,而 in_keys_inv 指向需要在传递给环境之前转换为双精度的键。

  • 我们使用 CatTensors 转换将状态键连接在一起。

  • 最后,我们也留下了归一化状态的可能性:我们稍后会处理归一化常数的计算。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

并行执行#

以下辅助函数允许我们并行运行环境。并行运行环境可以显著加快收集吞吐量。使用转换环境时,我们需要选择是为每个环境单独执行转换,还是集中数据并批量转换。两种方法都很容易编写。

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

为了利用 PyTorch 的向量化能力,我们采用第一种方法。

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 将多个步骤与单个动作合并在一起。如果 > 1,则需要调整其他帧计数(例如,每批帧数、总帧数)以在不同实验中获得一致的总收集帧数。这一点很重要,因为在保持总帧数不变的情况下增加帧跳跃可能看起来像是在“作弊”:比较一下,数据集大小为 10M 元素(帧跳跃为 2)与数据集大小为 10M 元素(帧跳跃为 1),实际上与环境的交互比率为 2:1!简而言之,在处理帧跳跃时应谨慎对待训练脚本的帧计数,因为这可能导致训练策略之间的不公平比较。

缩放奖励有助于我们控制信号幅度,从而实现更有效的学习。

reward_scaling = 5.0

我们还定义了何时截断轨迹。对于 cheetah 任务,一千步(如果帧跳跃 = 2,则为 500)是一个很好的选择。

max_frames_per_traj = 500

观测值的归一化#

为了计算归一化统计量,我们在环境中运行任意数量的随机步数,并计算收集到的观测值的平均值和标准差。ObservationNorm.init_stats() 方法可用于此目的。为了获得汇总统计量,我们创建一个虚拟环境并运行给定的步数,收集这些步数的数据并计算其汇总统计量。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

归一化统计量#

作为使用 ObservationNorm 计算统计量的随机步数

init_env_steps = 5000

transform_state_dict = get_env_stats()
Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.org.cn/introduction/migration_guide/ for additional information.

每个数据收集器中的环境数量

env_per_collector = 4

我们将之前计算出的统计量传递给环境,以归一化其输出

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import CompositeSpec

构建模型#

现在我们转向模型的设置。如前所述,DDPG 需要一个价值网络(训练以估计状态-动作对的价值)和一个参数化演员(学习如何选择能最大化该价值的动作)。

回想一下,构建 TorchRL 模块需要两个步骤:

在更复杂的场景中,还可以使用 tensordict.nn.TensorDictSequential

Q 值网络被包装在 ValueOperator 中,它会自动将 out_keys 设置为 Q 值网络的 "state_action_value" 以及其他价值网络的 "state_value"

TorchRL 提供了论文中提出的 DDPG 网络内置版本。可以在 DdpgMlpActorDdpgMlpQNet 下找到。

由于我们使用惰性(lazy)模块,有必要在将策略从设备移动到设备并进行其他操作之前,使这些惰性模块具象化(materialize)。因此,用一小部分数据运行模块是一个很好的做法。为此,我们从环境规范中生成假数据。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=CompositeSpec(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)
/usr/local/lib/python3.10/dist-packages/torchrl/data/tensor_specs.py:7085: DeprecationWarning: The CompositeSpec has been deprecated and will be removed in v0.8. Please use Composite instead.
  warnings.warn(

探索#

策略被传递给 OrnsteinUhlenbeckProcessModule 探索模块,正如原论文所建议的那样。让我们定义 OU 噪声达到其最小值之前的帧数。

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

数据收集器#

TorchRL 提供了专门的类来帮助你通过在环境中执行策略来收集数据。这些“数据收集器”会迭代计算要在给定时间执行的动作,然后在环境中执行一步并根据需要重置它。数据收集器的设计旨在帮助开发者严格控制每批数据的帧数、收集的(异步/同步)性质以及分配给数据收集的资源(例如 GPU、工作进程数量等)。

在这里,我们将使用 SyncDataCollector,这是一个简单的、单进程数据收集器。TorchRL 提供了其他收集器,例如 MultiaSyncDataCollector,它以异步方式执行回放(例如,当策略被优化时收集数据,从而解耦训练和数据收集)。

需要指定的参数有:

  • 环境工厂或环境;

  • 策略;

  • 收集器被认为已清空之前的总帧数;

  • 每条轨迹的最大帧数(对于非终止环境,如 dm_control 环境很有用)。

    注意

    传递给收集器的 max_frames_per_traj 将会为用于推理的环境注册一个新的 StepCounter 转换。正如我们在本脚本中所做的那样,我们也可以手动达到同样的结果。

还应该传递:

  • 每个收集批次中的帧数;

  • 独立于策略执行的随机步数;

  • 用于策略执行的设备;

  • 数据传递到主进程之前用于存储数据的设备。

训练期间使用的总帧数应在 100 万左右。

total_frames = 10_000  # 1_000_000

外部循环每次迭代中收集器返回的帧数等于每条子轨迹的长度乘以每个收集器中并行运行的环境数量。

换句话说,我们预计收集器的批次形状为 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

评估器:构建你的记录器对象#

由于训练数据是使用某种探索策略获得的,因此我们算法的真实性能需要在确定性模式下进行评估。我们使用一个专门的类 Recorder 来完成此操作,它以特定频率在环境中执行策略,并返回从这些模拟中获得的统计数据。

以下辅助函数构建了此对象。

from torchrl.trainers import Recorder


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = Recorder(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我们将每收集 10 个批次就记录一次性能。

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

回放缓冲区#

回放缓冲区有两种类型:优先重放(其中使用某种误差信号赋予某些项目比其他项目更高的采样概率)和常规的循环经验回放。

TorchRL 回放缓冲区是可组合的:可以选择存储、采样和写入策略。还可以使用内存映射数组将张量存储在物理内存上。以下函数负责使用所需的超参数创建回放缓冲区。

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我们将把回放缓冲区存储在磁盘上的临时目录中。

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

回放缓冲区存储与批次大小#

TorchRL 回放缓冲区计算第一个维度上的元素数量。由于我们将向缓冲区提供轨迹,因此需要通过将缓冲区大小除以数据收集器产生的子轨迹长度来调整缓冲区大小。关于批次大小,我们的采样策略包括:在选择要计算损失的子轨迹(长度 random_crop_len=25)之前,采样长度为 traj_len=200 的轨迹。这种策略平衡了“存储一定长度的完整轨迹”与“提供足够异质性的样本以供损失计算”的需求。下图展示了数据流:一个收集器在每个批次中获取 8 帧,并行运行 2 个环境,将它们馈送到包含 1000 条轨迹的回放缓冲区中,并分别采样长度为 2 个时间步的子轨迹。

Storing trajectories in the replay buffer

让我们从存储在缓冲区中的帧数开始。

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

优先回放缓冲区默认禁用。

prb = False

我们还需要定义每个收集的数据批次进行多少次更新。这被称为更新与数据(update-to-data, UTD)比率。

update_to_data = 64

我们将向损失函数提供长度为 25 的轨迹。

random_crop_len = 25

在原论文中,作者为每个收集的帧执行一次包含 64 个元素的批次更新。在这里,我们在每个批次收集时执行多次更新以重现相同的比率。我们调整批次大小以实现相同的“每帧更新次数”比率。

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

损失模块构建#

我们用刚才创建的演员和 qnet 构建损失模块。由于有目标参数需要更新,我们*必须*创建一个目标网络更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

让我们使用 TD(lambda) 估计器!

注意

离线策略(Off-policy)通常使用 TD(0) 估计器。在这里,我们使用 TD(\(\lambda\)) 估计器,这将引入一些偏差,因为遵循特定状态的轨迹是使用过时的策略收集的。这种技巧,以及可以在数据收集过程中使用的多步(multi-step)技巧,都是我们通常发现虽然在返回估计中引入了一些偏差,但在实践中却能很好工作的替代性“黑客手段”。

目标网络更新器#

目标网络是离线策略强化学习算法的关键部分。多亏了 HardUpdateSoftUpdate 类,更新目标网络参数变得非常容易。它们以损失模块为参数构建,更新通过在训练循环的适当位置调用 updater.step() 来实现。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

优化器#

最后,我们将为策略和价值网络使用 Adam 优化器。

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

训练策略#

既然已经构建了所需的所有模块,训练循环就非常直观了。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector
  0%|          | 0/10000 [00:00<?, ?it/s]
  8%|▊         | 800/10000 [00:00<00:06, 1421.47it/s]
 16%|█▌        | 1600/10000 [00:02<00:15, 525.12it/s]
 24%|██▍       | 2400/10000 [00:03<00:09, 791.46it/s]
 32%|███▏      | 3200/10000 [00:03<00:06, 1038.76it/s]
 40%|████      | 4000/10000 [00:03<00:04, 1258.53it/s]
 48%|████▊     | 4800/10000 [00:04<00:03, 1453.63it/s]
 56%|█████▌    | 5600/10000 [00:04<00:02, 1579.12it/s]
reward: -2.78 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.35/5.82, grad norm= 61.72, loss_value= 255.36, loss_actor= 14.16, target value: -14.21:  56%|█████▌    | 5600/10000 [00:06<00:02, 1579.12it/s]
reward: -2.78 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.35/5.82, grad norm= 61.72, loss_value= 255.36, loss_actor= 14.16, target value: -14.21:  64%|██████▍   | 6400/10000 [00:07<00:05, 710.76it/s]
reward: -1.59 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.07/5.38, grad norm= 75.30, loss_value= 247.78, loss_actor= 12.61, target value: -13.23:  64%|██████▍   | 6400/10000 [00:08<00:05, 710.76it/s]
reward: -1.59 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.07/5.38, grad norm= 75.30, loss_value= 247.78, loss_actor= 12.61, target value: -13.23:  72%|███████▏  | 7200/10000 [00:09<00:05, 526.16it/s]
reward: -5.20 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-3.07/5.31, grad norm= 171.31, loss_value= 284.52, loss_actor= 18.27, target value: -20.35:  72%|███████▏  | 7200/10000 [00:11<00:05, 526.16it/s]
reward: -5.20 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-3.07/5.31, grad norm= 171.31, loss_value= 284.52, loss_actor= 18.27, target value: -20.35:  80%|████████  | 8000/10000 [00:11<00:04, 447.50it/s]
reward: -4.54 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.90/5.97, grad norm= 148.22, loss_value= 329.17, loss_actor= 13.70, target value: -18.46:  80%|████████  | 8000/10000 [00:13<00:04, 447.50it/s]
reward: -4.54 (r0 = -2.60), reward eval: reward: -0.00, reward normalized=-2.90/5.97, grad norm= 148.22, loss_value= 329.17, loss_actor= 13.70, target value: -18.46:  88%|████████▊ | 8800/10000 [00:14<00:02, 406.44it/s]
reward: -5.18 (r0 = -2.60), reward eval: reward: -5.95, reward normalized=-3.60/5.13, grad norm= 297.89, loss_value= 357.67, loss_actor= 20.02, target value: -24.73:  88%|████████▊ | 8800/10000 [00:17<00:02, 406.44it/s]
reward: -5.18 (r0 = -2.60), reward eval: reward: -5.95, reward normalized=-3.60/5.13, grad norm= 297.89, loss_value= 357.67, loss_actor= 20.02, target value: -24.73:  96%|█████████▌| 9600/10000 [00:18<00:01, 305.63it/s]
reward: -5.40 (r0 = -2.60), reward eval: reward: -5.95, reward normalized=-2.90/4.78, grad norm= 72.82, loss_value= 165.31, loss_actor= 20.59, target value: -20.20:  96%|█████████▌| 9600/10000 [00:20<00:01, 305.63it/s]
reward: -5.40 (r0 = -2.60), reward eval: reward: -5.95, reward normalized=-2.90/4.78, grad norm= 72.82, loss_value= 165.31, loss_actor= 20.59, target value: -20.20: : 10400it [00:21, 303.12it/s]
reward: -5.28 (r0 = -2.60), reward eval: reward: -5.95, reward normalized=-2.78/4.90, grad norm= 98.21, loss_value= 200.46, loss_actor= 20.34, target value: -20.74: : 10400it [00:23, 303.12it/s]

实验结果#

我们对训练期间的平均奖励绘制了一个简单的图表。我们可以观察到,我们的策略很好地学习了如何解决该任务。

注意

如上所述,为了获得更合理的性能,请增加 total_frames 的值,例如 100 万。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()
coding ddpg

结论#

在本教程中,我们学习了如何以 DDPG 为具体示例在 TorchRL 中编写损失模块。

主要结论是:

  • 如何使用 LossModule 类编写一个新的损失组件;

  • 如何使用(或不使用)目标网络,以及如何更新其参数;

  • 如何创建与损失模块相关联的优化器。

下一步#

要在此损失模块上进一步迭代,我们可能会考虑:

脚本总运行时间:(0 分 29.584 秒)