注意
转到底部 下载完整的示例代码。
TorchRL 目标:编写 DDPG 损失#
创建时间:2023 年 8 月 14 日 | 最后更新:2025 年 3 月 20 日 | 最后验证:未验证
概述#
TorchRL 将强化学习算法的训练分解为几个部分,这些部分将在您的训练脚本中进行组装:环境、数据收集和存储、模型以及最终的损失函数。
TorchRL 损失(或“目标”)是包含可训练参数(策略和价值模型)的状态化对象。本教程将指导您完成使用 TorchRL 从头开始编写损失的步骤。
为此,我们将重点关注 DDPG,这是一个相对容易编写的算法。深度确定性策略梯度 (DDPG) 是一个简单的连续控制算法。它包括学习一个参数化价值函数,用于表示动作-观测对的价值,然后学习一个策略,该策略在给定特定观测的情况下输出最大化该价值函数的动作。
您将学到什么
如何编写损失模块并自定义其价值估计器;
如何在 TorchRL 中构建环境,包括转换(例如,数据归一化)和并行执行;
如何设计策略和价值网络;
如何从环境中高效收集数据并将其存储在回放缓冲区中;
如何在回放缓冲区中存储轨迹(而不是单个转换);
如何评估您的模型。
先决条件#
本教程假设您已完成 PPO 教程,该教程概述了 TorchRL 的组件和依赖项,例如 tensordict.TensorDict 和 tensordict.nn.TensorDictModules,尽管它应该足够透明,无需深入了解这些类即可理解。
注意
我们的目标不是提供 SOTA 算法的实现,而是提供 TorchRL 损失实现以及在算法中使用到的库功能的高级说明。
导入和设置#
%%bash pip3 install torchrl mujoco glfw
import torch
import tqdm
如果可用,我们将策略在 CUDA 上执行
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
collector_device = torch.device("cpu") # Change the device to ``cuda`` to use CUDA
TorchRL LossModule#
TorchRL 提供了一系列可在训练脚本中使用的损失。目的是让损失易于重用/互换,并且具有简单的签名。
TorchRL 损失的主要特点是
它们是状态化对象:它们包含可训练参数的副本,因此
loss_module.parameters()返回训练算法所需的一切。它们遵循
TensorDict约定:torch.nn.Module.forward()方法将接收一个 TensorDict 作为输入,其中包含返回损失值的所有必要信息。>>> data = replay_buffer.sample() >>> loss_dict = loss_module(data)
它们输出一个
tensordict.TensorDict实例,其中损失值以"loss_<smth>"的形式写入,其中smth是描述损失的字符串。TensorDict中的其他键可能是有用的指标,可以在训练时记录。注意
我们返回独立损失的原因是让用户可以为不同的参数集使用不同的优化器。损失的求和可以通过以下方式简单完成
>>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
__init__ 方法#
所有损失的父类是 LossModule。与库中的许多其他组件一样,它的 forward() 方法期望的输入是来自经验回放缓冲区或其他类似数据结构的 tensordict.TensorDict 实例。使用这种格式可以跨模态或在模型需要读取多个条目的复杂场景中重用模块。换句话说,它允许我们编写一个对输入数据类型不敏感的损失模块,并专注于运行损失函数的各个基本步骤。
为了使教程尽可能具有启发性,我们将独立展示类的每个方法,并在稍后填充类。
让我们从 __init__() 方法开始。DDPG 旨在通过一个简单的策略解决控制任务:训练一个策略来输出最大化价值网络预测的动作。因此,我们的损失模块需要在其构造函数中接收两个网络:一个 Actor 和一个价值网络。我们期望两者都是 TensorDict 兼容的对象,例如 tensordict.nn.TensorDictModule。我们的损失函数需要计算一个目标值并拟合价值网络,生成一个动作并拟合策略,以便最大化其价值估计。
LossModule.__init__() 方法的关键步骤是调用 convert_to_functional()。此方法将从模块中提取参数并将其转换为函数式模块。严格来说,这并非必需,并且可以完全不使用它编写所有损失。但是,我们鼓励使用它,原因如下。
TorchRL 这样做是因为 RL 算法经常使用不同的参数集执行相同的模型,称为“可训练”和“目标”参数。“可训练”参数是优化器需要拟合的参数。“目标”参数通常是前者的副本,带有时间滞后(绝对的或通过移动平均稀释的)。这些目标参数用于计算与下一个观测值相关联的值。使用一组不完全匹配当前配置的目标参数的一个优点是,它们为正在计算的价值函数提供了悲观界限。请注意下面的 create_target_params 关键字参数:此参数告诉 convert_to_functional() 方法在损失模块中创建一个目标参数集,用于目标值计算。如果将其设置为 False(例如,参见 Actor 网络),则 target_actor_network_params 属性仍然可访问,但这只会返回 Actor 参数的 **分离** 版本。
稍后,我们将看到如何在 TorchRL 中更新目标参数。
from tensordict.nn import TensorDictModule, TensorDictSequential
def _init(
self,
actor_network: TensorDictModule,
value_network: TensorDictModule,
) -> None:
super(type(self), self).__init__()
self.convert_to_functional(
actor_network,
"actor_network",
create_target_params=True,
)
self.convert_to_functional(
value_network,
"value_network",
create_target_params=True,
compare_against=list(actor_network.parameters()),
)
self.actor_in_keys = actor_network.in_keys
# Since the value we'll be using is based on the actor and value network,
# we put them together in a single actor-critic container.
actor_critic = ActorCriticWrapper(actor_network, value_network)
self.actor_critic = actor_critic
self.loss_function = "l2"
价值估计器损失方法#
在许多 RL 算法中,价值网络(或 Q 值网络)是根据经验价值估计进行训练的。它可以是自举(TD(0),低方差,高偏差),这意味着目标值仅使用下一个奖励获得,或者可以获得蒙特卡洛估计(TD(1)),在这种情况下,将使用所有即将到来的奖励序列(高方差,低偏差)。中间估计器(TD(\(\lambda\)))也可以用于在偏差和方差之间进行折衷。TorchRL 通过 ValueEstimators 枚举类,其中包含指向所有已实现价值估计器的指针,使使用一个或另一个估计器变得容易。让我们定义这里的默认价值函数。我们将采用最简单的版本(TD(0)),并稍后展示如何更改它。
from torchrl.objectives.utils import ValueEstimators
default_value_estimator = ValueEstimators.TD0
我们还需要根据用户查询,向 DDPG 提供如何构建价值估计器的说明。根据提供的估计器,我们将构建相应的模块以在训练时使用
from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator
def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
hp = dict(default_value_kwargs(value_type))
if hasattr(self, "gamma"):
hp["gamma"] = self.gamma
hp.update(hyperparams)
value_key = "state_action_value"
if value_type == ValueEstimators.TD1:
self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.TD0:
self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.GAE:
raise NotImplementedError(
f"Value type {value_type} it not implemented for loss {type(self)}."
)
elif value_type == ValueEstimators.TDLambda:
self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
else:
raise NotImplementedError(f"Unknown value type {value_type}")
self._value_estimator.set_keys(value=value_key)
可以调用 make_value_estimator 方法,但不是必需的:如果未调用,LossModule 将使用其默认估计器查询此方法。
Actor 损失方法#
RL 算法的核心是 Actor 的训练损失。在 DDPG 中,此函数非常简单:我们只需要计算使用策略计算的动作的价值,并优化 Actor 的权重以最大化该价值。
计算此价值时,必须确保从计算图中取出价值参数,否则 Actor 和价值损失将混合在一起。为此,可以使用 hold_out_params() 函数。
def _loss_actor(
self,
tensordict,
) -> torch.Tensor:
td_copy = tensordict.select(*self.actor_in_keys)
# Get an action from the actor network: since we made it functional, we need to pass the params
with self.actor_network_params.to_module(self.actor_network):
td_copy = self.actor_network(td_copy)
# get the value associated with that action
with self.value_network_params.detach().to_module(self.value_network):
td_copy = self.value_network(td_copy)
return -td_copy.get("state_action_value")
价值损失方法#
我们现在需要优化价值网络参数。为此,我们将依赖于类的价值估计器
from torchrl.objectives.utils import distance_loss
def _loss_value(
self,
tensordict,
):
td_copy = tensordict.clone()
# V(s, a)
with self.value_network_params.to_module(self.value_network):
self.value_network(td_copy)
pred_val = td_copy.get("state_action_value").squeeze(-1)
# we manually reconstruct the parameters of the actor-critic, where the first
# set of parameters belongs to the actor and the second to the value function.
target_params = TensorDict(
{
"module": {
"0": self.target_actor_network_params,
"1": self.target_value_network_params,
}
},
batch_size=self.target_actor_network_params.batch_size,
device=self.target_actor_network_params.device,
)
with target_params.to_module(self.actor_critic):
target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)
# Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
td_error = (pred_val - target_value).pow(2)
return loss_value, td_error, pred_val, target_value
将它们放在 forward 调用中#
唯一缺少的部分是 forward 方法,它将连接价值损失和 Actor 损失,收集成本值,并将它们写入提供给用户的 TensorDict。
from tensordict import TensorDict, TensorDictBase
def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
loss_value, td_error, pred_val, target_value = self.loss_value(
input_tensordict,
)
td_error = td_error.detach()
td_error = td_error.unsqueeze(input_tensordict.ndimension())
if input_tensordict.device is not None:
td_error = td_error.to(input_tensordict.device)
input_tensordict.set(
"td_error",
td_error,
inplace=True,
)
loss_actor = self.loss_actor(input_tensordict)
return TensorDict(
source={
"loss_actor": loss_actor.mean(),
"loss_value": loss_value.mean(),
"pred_value": pred_val.mean().detach(),
"target_value": target_value.mean().detach(),
"pred_value_max": pred_val.max().detach(),
"target_value_max": target_value.max().detach(),
},
batch_size=[],
)
from torchrl.objectives import LossModule
class DDPGLoss(LossModule):
default_value_estimator = default_value_estimator
make_value_estimator = make_value_estimator
__init__ = _init
forward = _forward
loss_value = _loss_value
loss_actor = _loss_actor
现在我们有了损失,我们可以使用它来训练策略来解决控制任务。
环境#
在大多数算法中,首先需要处理的是环境的构建,因为它会影响训练脚本的其余部分。
在本例中,我们将使用 "cheetah" 任务。目标是让半猎豹尽可能快地奔跑。
在 TorchRL 中,可以通过依赖 dm_control 或 gym 来创建此类任务
env = GymEnv("HalfCheetah-v4")
或
env = DMControlEnv("cheetah", "run")
默认情况下,这些环境禁用渲染。从状态训练通常比从图像训练更容易。为简单起见,我们只关注从状态学习。要将像素传递给 tensordicts,这些 tensordicts 由 env.step() 收集,只需将 from_pixels=True 参数传递给构造函数
env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)
我们编写一个 make_env() 辅助函数,该函数将创建一个环境,该环境使用上述两种后端之一(dm-control 或 gym)。
from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv
env_library = None
env_name = None
def make_env(from_pixels=False):
"""Create a base ``env``."""
global env_library
global env_name
if backend == "dm_control":
env_name = "cheetah"
env_task = "run"
env_args = (env_name, env_task)
env_library = DMControlEnv
elif backend == "gym":
env_name = "HalfCheetah-v4"
env_args = (env_name,)
env_library = GymEnv
else:
raise NotImplementedError
env_kwargs = {
"device": device,
"from_pixels": from_pixels,
"pixels_only": from_pixels,
"frame_skip": 2,
}
env = env_library(*env_args, **env_kwargs)
return env
转换#
现在我们有了一个基本环境,我们可能希望修改其表示形式,使其更适合策略。在 TorchRL 中,转换会附加到特殊类 torchr.envs.TransformedEnv 的基本环境中。
在 DDPG 中,使用启发式值对奖励进行重缩放是很常见的。在此示例中,我们将奖励乘以 5。
如果我们使用的是
dm_control,那么建立模拟器(使用双精度数)和我们的脚本(可能使用单精度数)之间的接口也很重要。此转换是双向的:调用env.step()时,我们的动作需要表示为双精度,输出需要转换为单精度。DoubleToFloat转换正是这样做的:in_keys列表指的是需要从双精度转换为单精度的键,而in_keys_inv指的是在传递给环境之前需要转换为双精度的键。我们使用
CatTensors转换将状态键连接在一起。最后,我们还保留了归一化状态的可能性:我们将负责稍后计算归一化常数。
from torchrl.envs import (
CatTensors,
DoubleToFloat,
EnvCreator,
InitTracker,
ObservationNorm,
ParallelEnv,
RewardScaling,
StepCounter,
TransformedEnv,
)
def make_transformed_env(
env,
):
"""Apply transforms to the ``env`` (such as reward scaling and state normalization)."""
env = TransformedEnv(env)
# we append transforms one by one, although we might as well create the
# transformed environment using the `env = TransformedEnv(base_env, transforms)`
# syntax.
env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))
# We concatenate all states into a single "observation_vector"
# even if there is a single tensor, it'll be renamed in "observation_vector".
# This facilitates the downstream operations as we know the name of the
# output tensor.
# In some environments (not half-cheetah), there may be more than one
# observation vector: in this case this code snippet will concatenate them
# all.
selected_keys = list(env.observation_spec.keys())
out_key = "observation_vector"
env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))
# we normalize the states, but for now let's just instantiate a stateless
# version of the transform
env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))
env.append_transform(DoubleToFloat())
env.append_transform(StepCounter(max_frames_per_traj))
# We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
# exploration:
env.append_transform(InitTracker())
return env
并行执行#
以下辅助函数允许我们在并行环境中运行。并行运行环境可以显著加快收集吞吐量。使用转换后的环境时,我们需要选择是单独为每个环境执行转换,还是集中数据并批量转换。这两种方法都易于编码
env = ParallelEnv(
lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
num_workers=4
)
env = TransformedEnv(
ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
transforms
)
为了利用 PyTorch 的矢量化能力,我们采用第一种方法
def parallel_env_constructor(
env_per_collector,
transform_state_dict,
):
if env_per_collector == 1:
def make_t_env():
env = make_transformed_env(make_env())
env.transform[2].init_stats(3)
env.transform[2].loc.copy_(transform_state_dict["loc"])
env.transform[2].scale.copy_(transform_state_dict["scale"])
return env
env_creator = EnvCreator(make_t_env)
return env_creator
parallel_env = ParallelEnv(
num_workers=env_per_collector,
create_env_fn=EnvCreator(lambda: make_env()),
create_env_kwargs=None,
pin_memory=False,
)
env = make_transformed_env(parallel_env)
# we call `init_stats` for a limited number of steps, just to instantiate
# the lazy buffers.
env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
env.transform[2].load_state_dict(transform_state_dict)
return env
# The backend can be ``gym`` or ``dm_control``
backend = "gym"
注意
frame_skip 将多个步长与单个动作一起批量处理。如果 > 1,则需要调整其他帧计数(例如,frames_per_batch、total_frames)以在实验中保持一致的总帧数。这很重要,因为提高 frame_skip 但保持总帧数不变可能显得不公平:所有其他条件相同,使用 frame_skip 为 2 收集的数据集和使用 frame_skip 为 1 收集的数据集实际上具有 2:1 的环境交互比例!简而言之,在处理 frame skipping 时,应谨慎对待训练脚本的帧计数,因为这可能导致训练策略之间进行有偏倚的比较。
缩放奖励有助于我们控制信号幅度,以实现更有效的学习。
reward_scaling = 5.0
我们还定义了轨迹何时将被截断。一千步(如果 frame_skip = 2,则为 500 步)是 cheetah 任务的好选择
max_frames_per_traj = 500
观测值归一化#
为了计算归一化统计数据,我们运行任意数量的随机步骤,并在环境中计算收集到的观测值的均值和标准差。ObservationNorm.init_stats() 方法可用于此目的。为了获得汇总统计数据,我们创建了一个虚拟环境并运行了给定的步数,收集了给定步数的数据并计算了其汇总统计数据。
def get_env_stats():
"""Gets the stats of an environment."""
proof_env = make_transformed_env(make_env())
t = proof_env.transform[2]
t.init_stats(init_env_steps)
transform_state_dict = t.state_dict()
proof_env.close()
return transform_state_dict
归一化统计数据#
用于通过 ObservationNorm 进行统计计算的随机步数
init_env_steps = 5000
transform_state_dict = get_env_stats()
Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.org.cn/introduction/migration_guide/ for additional information.
每个数据收集器中的环境数量
env_per_collector = 4
我们将之前计算的统计数据传递给我们的环境输出进行归一化
parallel_env = parallel_env_constructor(
env_per_collector=env_per_collector,
transform_state_dict=transform_state_dict,
)
from torchrl.data import CompositeSpec
构建模型#
现在我们转向模型的设置。如前所述,DDPG 需要一个价值网络,该网络用于估计状态-动作对的价值,以及一个参数化 Actor,它学习如何选择最大化该价值的动作。
回想一下,构建 TorchRL 模块需要两个步骤
编写将用作网络的
torch.nn.Module,将网络包装在
tensordict.nn.TensorDictModule中,其中通过指定输入和输出键来处理数据流。
在更复杂的场景中,也可以使用 tensordict.nn.TensorDictSequential。
Q 值网络被包装在 ValueOperator 中,该类自动将 out_keys 设置为 "state_action_value"(对于 q 值网络)和 "state_value"(对于其他价值网络)。
TorchRL 提供了原始论文中介绍的 DDPG 网络的内置版本。这些可以在 DdpgMlpActor 和 DdpgMlpQNet 中找到。
由于我们使用懒惰模块,因此在将策略移动到设备之间以及执行其他操作之前,有必要具体化懒惰模块。因此,最好使用少量数据运行模块。为此,我们生成了来自环境规范的伪数据。
from torchrl.modules import (
ActorCriticWrapper,
DdpgMlpActor,
DdpgMlpQNet,
OrnsteinUhlenbeckProcessModule,
ProbabilisticActor,
TanhDelta,
ValueOperator,
)
def make_ddpg_actor(
transform_state_dict,
device="cpu",
):
proof_environment = make_transformed_env(make_env())
proof_environment.transform[2].init_stats(3)
proof_environment.transform[2].load_state_dict(transform_state_dict)
out_features = proof_environment.action_spec.shape[-1]
actor_net = DdpgMlpActor(
action_dim=out_features,
)
in_keys = ["observation_vector"]
out_keys = ["param"]
actor = TensorDictModule(
actor_net,
in_keys=in_keys,
out_keys=out_keys,
)
actor = ProbabilisticActor(
actor,
distribution_class=TanhDelta,
in_keys=["param"],
spec=CompositeSpec(action=proof_environment.action_spec),
).to(device)
q_net = DdpgMlpQNet()
in_keys = in_keys + ["action"]
qnet = ValueOperator(
in_keys=in_keys,
module=q_net,
).to(device)
# initialize lazy modules
qnet(actor(proof_environment.reset().to(device)))
return actor, qnet
actor, qnet = make_ddpg_actor(
transform_state_dict=transform_state_dict,
device=device,
)
/usr/local/lib/python3.10/dist-packages/torchrl/data/tensor_specs.py:6911: DeprecationWarning:
The CompositeSpec has been deprecated and will be removed in v0.8. Please use Composite instead.
探索#
如原始论文所示,策略被放入 OrnsteinUhlenbeckProcessModule 探索模块。让我们定义 OU 噪声达到最小值之前的帧数
annealing_frames = 1_000_000
actor_model_explore = TensorDictSequential(
actor,
OrnsteinUhlenbeckProcessModule(
spec=actor.spec.clone(),
annealing_num_steps=annealing_frames,
).to(device),
)
if device == torch.device("cpu"):
actor_model_explore.share_memory()
数据收集器#
TorchRL 提供了专门的类来帮助您通过在环境中执行策略来收集数据。这些“数据收集器”会迭代地计算在给定时间要执行的动作,然后执行环境中的一个步长,并在需要时重置它。数据收集器的设计旨在帮助开发人员严格控制每批数据的帧数、收集的(异步)性质以及分配给数据收集的资源(例如 GPU、工作线程数量等)。
在这里,我们将使用 SyncDataCollector,这是一个简单的单进程数据收集器。TorchRL 还提供其他收集器,例如 MultiaSyncDataCollector,它以异步方式执行滚动(例如,在优化策略时收集数据,从而将训练和数据收集解耦)。
要指定的参数是
一个环境工厂或一个环境;
策略;
在收集器被认为为空之前的总帧数;
每个轨迹的最大帧数(对于非终止环境,如
dm_control环境,这很有用)。注意
传递给收集器的
max_frames_per_traj将导致在用于推理的环境中注册一个新的StepCounter转换。我们可以在脚本中手动实现相同的结果。
还应传递
每个收集批次的帧数;
独立于策略执行的随机步数;
用于策略执行的设备;
用于在将数据传递给主进程之前存储数据的设备。
我们将在训练期间使用的总帧数应约为 1M。
total_frames = 10_000 # 1_000_000
外层循环的每次迭代中收集器返回的帧数等于每个子轨迹的长度乘以每个收集器中并行运行的环境数量。
换句话说,我们期望来自收集器的批次形状为 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector
traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2
from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType
collector = SyncDataCollector(
parallel_env,
policy=actor_model_explore,
total_frames=total_frames,
frames_per_batch=frames_per_batch,
init_random_frames=init_random_frames,
reset_at_each_iter=False,
split_trajs=False,
device=collector_device,
exploration_type=ExplorationType.RANDOM,
)
评估器:构建记录器对象#
由于训练数据是通过某种探索策略获得的,因此需要以确定性模式评估我们算法的真实性能。我们使用一个专用类 Recorder 来实现这一点,该类以给定的频率在环境中执行策略,并返回从这些模拟中获得的一些统计数据。
以下辅助函数构建此对象
from torchrl.trainers import Recorder
def make_recorder(actor_model_explore, transform_state_dict, record_interval):
base_env = make_env()
environment = make_transformed_env(base_env)
environment.transform[2].init_stats(
3
) # must be instantiated to load the state dict
environment.transform[2].load_state_dict(transform_state_dict)
recorder_obj = Recorder(
record_frames=1000,
policy_exploration=actor_model_explore,
environment=environment,
exploration_type=ExplorationType.DETERMINISTIC,
record_interval=record_interval,
)
return recorder_obj
我们将每收集 10 个批次记录一次性能
record_interval = 10
recorder = make_recorder(
actor_model_explore, transform_state_dict, record_interval=record_interval
)
from torchrl.data.replay_buffers import (
LazyMemmapStorage,
PrioritizedSampler,
RandomSampler,
TensorDictReplayBuffer,
)
回放缓冲区#
回放缓冲区有两种类型:优先(使用一些误差信号,使某些项的采样概率高于其他项)和常规的循环经验回放。
TorchRL 回放缓冲区是可组合的:可以选择存储、采样和写入策略。也可以使用内存映射数组将张量存储在物理内存中。以下函数负责使用所需的超参数创建回放缓冲区
from torchrl.envs import RandomCropTensorDict
def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
if prb:
sampler = PrioritizedSampler(
max_capacity=buffer_size,
alpha=0.7,
beta=0.5,
)
else:
sampler = RandomSampler()
replay_buffer = TensorDictReplayBuffer(
storage=LazyMemmapStorage(
buffer_size,
scratch_dir=buffer_scratch_dir,
),
batch_size=batch_size,
sampler=sampler,
pin_memory=False,
prefetch=prefetch,
transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
)
return replay_buffer
我们将回放缓冲区存储在磁盘上的临时目录中
import tempfile
tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name
回放缓冲区存储和批次大小#
TorchRL 回放缓冲区沿第一个维度计算元素数量。由于我们将把轨迹馈送到缓冲区,因此我们需要通过除以数据收集器生成的子轨迹的长度来调整缓冲区大小。关于批次大小,我们的采样策略将包括对长度为 traj_len=200 的轨迹进行采样,然后选择长度为 random_crop_len=25 的子轨迹,在这些子轨迹上计算损失。此策略平衡了存储具有特定长度的完整轨迹的选择以及向损失提供足够异质性样本的需求。下图显示了从一个收集器到回放缓冲区的 8 帧数据流,该收集器并行运行 2 个环境,然后对其进行采样,最后从中提取长度为 2 个时间步的子轨迹。
让我们从缓冲区中存储的帧数开始
def ceil_div(x, y):
return -x // (-y)
buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)
优先回放缓冲区默认禁用
prb = False
我们还需要定义每个收集批次将进行多少次更新。这称为更新与数据比率或 UTD 比率
update_to_data = 64
我们将使用长度为 25 的轨迹来馈送损失
random_crop_len = 25
在原始论文中,作者每收集一帧数据执行一次批次大小为 64 的更新。在这里,我们复制相同的比率,但在每次批次收集时进行多次更新。我们调整批次大小以实现相同的每帧更新比率
batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)
replay_buffer = make_replay_buffer(
buffer_size=buffer_size,
batch_size=batch_size,
random_crop_len=random_crop_len,
prefetch=3,
prb=prb,
)
损失模块构建#
我们使用刚刚创建的 Actor 和 qnet 来构建损失模块。因为有目标参数需要更新,所以我们 _必须_ 创建一个目标网络更新器。
让我们使用 TD(lambda) 估计器!
loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda, device=device)
注意
离轨策略通常指示使用 TD(0) 估计器。在这里,我们使用 TD(\(\lambda\)) 估计器,这会引入一些偏差,因为在特定状态之后遵循的轨迹是用过时的策略收集的。此技巧,就像在数据收集期间可以使用多步技巧一样,是“技巧”的替代版本,尽管它们在返回值估计中引入了一些偏差,但它们通常在实践中效果很好。
目标网络更新器#
目标网络是离轨 RL 算法的关键部分。通过 HardUpdate 和 SoftUpdate 类,可以轻松更新目标网络参数。它们使用损失模块作为参数进行构建,并通过在训练循环的适当位置调用 updater.step() 来实现更新。
from torchrl.objectives.utils import SoftUpdate
target_net_updater = SoftUpdate(loss_module, eps=1 - tau)
优化器#
最后,我们将使用 Adam 优化器来优化策略和价值网络
from torch import optim
optimizer_actor = optim.Adam(
loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch
训练策略的时候到了#
现在我们已经构建了所有需要的模块,训练循环就相当直接了。
rewards = []
rewards_eval = []
# Main loop
collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):
# update weights of the inference policy
collector.update_policy_weights_()
if r0 is None:
r0 = tensordict["next", "reward"].mean().item()
pbar.update(tensordict.numel())
# extend the replay buffer with the new data
current_frames = tensordict.numel()
collected_frames += current_frames
replay_buffer.extend(tensordict.cpu())
# optimization steps
if collected_frames >= init_random_frames:
for _ in range(update_to_data):
# sample from replay buffer
sampled_tensordict = replay_buffer.sample().to(device)
# Compute loss
loss_dict = loss_module(sampled_tensordict)
# optimize
loss_dict["loss_actor"].backward()
gn1 = torch.nn.utils.clip_grad_norm_(
loss_module.actor_network_params.values(True, True), 10.0
)
optimizer_actor.step()
optimizer_actor.zero_grad()
loss_dict["loss_value"].backward()
gn2 = torch.nn.utils.clip_grad_norm_(
loss_module.value_network_params.values(True, True), 10.0
)
optimizer_value.step()
optimizer_value.zero_grad()
gn = (gn1**2 + gn2**2) ** 0.5
# update priority
if prb:
replay_buffer.update_tensordict_priority(sampled_tensordict)
# update target network
target_net_updater.step()
rewards.append(
(
i,
tensordict["next", "reward"].mean().item(),
)
)
td_record = recorder(None)
if td_record is not None:
rewards_eval.append((i, td_record["r_evaluation"].item()))
if len(rewards_eval) and collected_frames >= init_random_frames:
target_value = loss_dict["target_value"].item()
loss_value = loss_dict["loss_value"].item()
loss_actor = loss_dict["loss_actor"].item()
rn = sampled_tensordict["next", "reward"].mean().item()
rs = sampled_tensordict["next", "reward"].std().item()
pbar.set_description(
f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
f"reward normalized={rn :4.2f}/{rs :4.2f}, "
f"grad norm={gn: 4.2f}, "
f"loss_value={loss_value: 4.2f}, "
f"loss_actor={loss_actor: 4.2f}, "
f"target value: {target_value: 4.2f}"
)
# update the exploration strategy
actor_model_explore[1].step(current_frames)
collector.shutdown()
del collector
0%| | 0/10000 [00:00<?, ?it/s]
8%|▊ | 800/10000 [00:00<00:06, 1355.90it/s]
16%|█▌ | 1600/10000 [00:02<00:15, 526.86it/s]
24%|██▍ | 2400/10000 [00:03<00:09, 773.78it/s]
32%|███▏ | 3200/10000 [00:03<00:06, 994.86it/s]
40%|████ | 4000/10000 [00:04<00:05, 1177.97it/s]
48%|████▊ | 4800/10000 [00:04<00:03, 1329.23it/s]
56%|█████▌ | 5600/10000 [00:05<00:03, 1446.60it/s]
reward: -2.18 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.29/6.36, grad norm= 129.21, loss_value= 444.52, loss_actor= 13.29, target value: -13.62: 56%|█████▌ | 5600/10000 [00:06<00:03, 1446.60it/s]
reward: -2.18 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.29/6.36, grad norm= 129.21, loss_value= 444.52, loss_actor= 13.29, target value: -13.62: 64%|██████▍ | 6400/10000 [00:07<00:05, 695.30it/s]
reward: -1.34 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.00/6.06, grad norm= 290.06, loss_value= 397.83, loss_actor= 14.71, target value: -19.55: 64%|██████▍ | 6400/10000 [00:09<00:05, 695.30it/s]
reward: -1.34 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.00/6.06, grad norm= 290.06, loss_value= 397.83, loss_actor= 14.71, target value: -19.55: 72%|███████▏ | 7200/10000 [00:09<00:05, 518.12it/s]
reward: -4.97 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.03/5.49, grad norm= 171.41, loss_value= 315.45, loss_actor= 19.29, target value: -19.99: 72%|███████▏ | 7200/10000 [00:11<00:05, 518.12it/s]
reward: -4.97 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.03/5.49, grad norm= 171.41, loss_value= 315.45, loss_actor= 19.29, target value: -19.99: 80%|████████ | 8000/10000 [00:12<00:04, 442.37it/s]
reward: -3.94 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.23/5.10, grad norm= 61.54, loss_value= 238.42, loss_actor= 13.63, target value: -14.66: 80%|████████ | 8000/10000 [00:14<00:04, 442.37it/s]
reward: -3.94 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.23/5.10, grad norm= 61.54, loss_value= 238.42, loss_actor= 13.63, target value: -14.66: 88%|████████▊ | 8800/10000 [00:14<00:02, 402.09it/s]
reward: -3.27 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.25/5.09, grad norm= 84.03, loss_value= 191.63, loss_actor= 13.87, target value: -15.35: 88%|████████▊ | 8800/10000 [00:18<00:02, 402.09it/s]
reward: -3.27 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.25/5.09, grad norm= 84.03, loss_value= 191.63, loss_actor= 13.87, target value: -15.35: 96%|█████████▌| 9600/10000 [00:18<00:01, 304.56it/s]
reward: -0.69 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.73/4.87, grad norm= 94.23, loss_value= 202.54, loss_actor= 15.48, target value: -19.00: 96%|█████████▌| 9600/10000 [00:20<00:01, 304.56it/s]
reward: -0.69 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.73/4.87, grad norm= 94.23, loss_value= 202.54, loss_actor= 15.48, target value: -19.00: : 10400it [00:21, 304.02it/s]
reward: -4.88 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.77/4.17, grad norm= 75.76, loss_value= 172.23, loss_actor= 20.02, target value: -19.61: : 10400it [00:23, 304.02it/s]
实验结果#
我们绘制了训练期间平均奖励的简单图。我们可以看到我们的策略学会了很好地解决任务。
注意
如上所述,为了获得更合理的结果,请使用更大的 total_frames 值,例如 1M。
from matplotlib import pyplot as plt
plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()

结论#
在本教程中,我们通过 DDPG 的具体示例学习了如何在 TorchRL 中编写损失模块。
要点总结
如何使用
LossModule类来编写新的损失组件;如何使用(或不使用)目标网络,以及如何更新其参数;
如何创建与损失模块关联的优化器。
下一步#
为了在此损失模块上进行进一步迭代,我们可以考虑
使用 @dispatch(参见 [Feature] Distpatch IQL loss module)。
允许灵活的 TensorDict 键。
脚本总运行时间: (0 分 29.811 秒)