快捷方式

torchrl.data 包

回放缓冲区

回放缓冲区是离策略 RL 算法的核心部分。TorchRL 提供了对几种广泛使用的回放缓冲区的有效实现。

ReplayBuffer(*[, storage, sampler, writer, ...])

一个通用的、可组合的回放缓冲区类。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

优先回放缓冲区。

TensorDictReplayBuffer(*[, priority_key])

TensorDict 特定的 ReplayBuffer 类的包装器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

TensorDict 特定的 PrioritizedReplayBuffer 类的包装器。

RayReplayBuffer(*args, replay_buffer_cls, ...)

Replay Buffer 的 Ray 实现,可以远程扩展和采样。

RemoteTensorDictReplayBuffer(*args, **kwargs)

一个对远程调用友好的 ReplayBuffer 类。

可组合的回放缓冲区

我们还为用户提供了组合回放缓冲区的能力。我们提供了广泛的回放缓冲区使用解决方案,包括支持几乎任何数据类型;内存、设备或物理内存存储;多种采样策略;变换的使用等。

支持的数据类型和存储选择

理论上,回放缓冲区支持任何数据类型,但我们不能保证每个组件都支持任何数据类型。最基本的回放缓冲区实现由一个带有 ListStorageReplayBuffer 基类构成。这效率非常低,但它允许您存储包含非张量数据的复杂数据结构。连续内存中的存储包括 TensorStorageLazyTensorStorageLazyMemmapStorage。这些类将 TensorDict 数据作为一等公民支持,但也支持任何 PyTree 数据结构(例如,元组、列表、字典和它们的嵌套版本)。TensorStorage 存储要求您在构造时提供存储,而 TensorStorage(RAM、CUDA)和 LazyMemmapStorage(物理内存)将在第一次扩展后为您预先分配存储。

以下是一些示例,首先是通用的 ListStorage

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

写入缓冲区的入口点是 add()extend()。还可以使用 __setitem__(),在这种情况下,数据将在指定位置写入,而不更新缓冲区的长度或光标。这在使用从缓冲区采样项目并随后原地更新其值时可能很有用。

使用 TensorStorage,我们告诉 RB 我们希望存储是连续的,这效率更高,但也更受限制。

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下来,我们可以避免创建容器,并让存储自动创建。这在使用 PyTrees 和 tensordicts 时非常有用!对于 PyTrees 和其他数据结构,add() 将传递给它的样本视为该类型的一个实例。extend() 则会认为数据是可迭代的。对于张量、tensordicts 和列表(如下所述),可迭代对象在根级别查找。对于 PyTrees,我们假设树中所有叶子(张量)的前导维度匹配。如果不匹配,extend 将抛出异常。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

extend() 在处理值列表时可能具有歧义的签名,这些值列表应该被解释为 PyTree(在这种情况下,列表中的所有元素将被放入存储中存储的 PyTree 的一个切片)或一次要添加的值列表。为了解决这个问题,TorchRL 明确区分了列表和元组:元组将被视为 PyTree,列表(在根级别)将被解释为要一次添加到缓冲区的值堆栈。

采样和索引

回放缓冲区可以被索引和采样。索引和采样在存储中的给定索引处收集数据,然后通过一系列变换和 collate_fn 进行处理,这些变换和 collate_fn 可以传递给回放缓冲区的 __init__ 函数。collate_fn 带有默认值,在大多数情况下应符合用户的期望,因此您通常不必担心它。变换通常是 Transform 的实例,尽管普通函数也可以工作(后一种情况下,inv() 方法显然将被忽略,而在前一种情况下,它可以用于在数据传递给缓冲区之前对其进行预处理)。最后,可以通过 prefetch 关键字参数将线程数传递给构造函数,从而通过多线程实现采样。我们建议用户在实际环境中对这种技术进行基准测试,然后再采用它,因为不能保证它在实践中会带来更快的吞吐量,具体取决于使用它的机器和设置。

采样时,batch_size 可以在构造时(例如,如果它在训练期间是恒定的)或在 sample() 方法中传递。

为了进一步优化采样策略,我们建议您查看我们的采样器!

以下是一些从回放缓冲区中获取数据的示例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下组件

FlatStorageCheckpointer([done_keys, reward_keys])

以紧凑的形式保存存储,在 TED 格式上节省空间。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以紧凑的形式保存存储,在 TED 格式上节省空间,并使用 H5 格式保存数据。

ImmutableDatasetWriter([compilable])

不可变数据集的阻塞写入器。

LazyMemmapStorage(max_size, *[, ...])

用于张量和 tensordicts 的内存映射存储。

LazyTensorStorage(max_size, *[, device, ...])

用于张量和 tensordicts 的预分配张量存储。

ListStorage([max_size, compilable, device])

存储在列表中的存储。

LazyStackStorage([max_size, compilable, ...])

返回 LazyStackTensorDict 实例的 ListStorage。

ListStorageCheckpointer()

ListStoage 的存储检查点。

NestedStorageCheckpointer([done_keys, ...])

以紧凑的形式保存存储,在 TED 格式上节省空间,并使用内存映射的嵌套张量。

PrioritizedSampler(max_capacity, alpha, beta)

回放缓冲区的优先采样器。

PrioritizedSliceSampler(max_capacity, alpha, ...)

使用优先采样,根据开始和停止信号,沿着第一个维度对数据切片进行采样。

RandomSampler()

可组合回放缓冲区的均匀随机采样器。

RoundRobinWriter([compilable])

可组合回放缓冲区的 RoundRobin Writer 类。

Sampler()

可组合回放缓冲区的通用采样器基类。

SamplerWithoutReplacement([drop_last, shuffle])

一个数据消耗型采样器,可确保相同的样本不会出现在连续的批次中。

SliceSampler(*[, num_slices, slice_len, ...])

根据开始和停止信号,沿着第一个维度对数据切片进行采样。

SliceSamplerWithoutReplacement(*[, ...])

根据开始和停止信号,沿着第一个维度对数据切片进行采样,不放回。

Storage(max_size[, checkpointer, compilable])

Storage 是回放缓冲区的容器。

StorageCheckpointerBase()

存储检查点程序的公共基类。

StorageEnsembleCheckpointer()

集成存储的检查点程序。

TensorDictMaxValueWriter([rank_key, reduction])

可组合回放缓冲区的 Writer 类,它根据某个排名键保留顶部元素。

TensorDictRoundRobinWriter([compilable])

可组合、基于 tensordict 的回放缓冲区的 RoundRobin Writer 类。

TensorStorage(storage[, max_size, device, ...])

用于张量和 tensordicts 的存储。

TensorStorageCheckpointer()

TensorStorages 的存储检查点。

Writer([compilable])

ReplayBuffer 的 Writer 基类。

存储选择对回放缓冲区的采样延迟影响很大,尤其是在数据量更大的分布式强化学习设置中。LazyMemmapStorage 在分布式设置中与共享存储一起被强烈推荐,因为 MemoryMappedTensors 的序列化成本较低,并且能够指定文件存储位置以改进节点故障恢复。从 https://github.com/pytorch/rl/tree/main/benchmarks/storage 的粗略基准测试中发现,与使用 ListStorage 相比,以下是采样延迟的平均改进。

存储类型

加速

ListStorage

1x

LazyTensorStorage

1.83x

LazyMemmapStorage

3.44x

跨进程共享回放缓冲区

只要其组件是可共享的,回放缓冲区就可以在进程之间共享。此功能允许多个进程收集数据并协同填充共享回放缓冲区,而不是将数据集中在主进程上,这可能会产生一些数据传输开销。

可共享的存储包括 LazyMemmapStorageTensorStorage 的任何子类,只要它们被实例化并且其内容以内存映射张量的形式存储。当前,像 TensorDictRoundRobinWriter 这样的有状态写入器,以及像 PrioritizedSampler 这样的有状态采样器,都是不可共享的。

共享回放缓冲区可以在任何能够访问它的进程上进行读取和扩展,如下例所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

存储轨迹

将轨迹存储在回放缓冲区中并不难。需要注意的一点是,回放缓冲区的默认大小是存储的前导维的大小:换句话说,创建一个大小为 1M 的存储的回放缓冲区,并不意味着存储 1M 帧,而是存储 1M 条轨迹。但是,如果轨迹(或回合/滚动)在存储之前被展平,容量仍然是 1M 步。

有一种方法可以解决这个问题,那就是告诉存储在保存数据时应该考虑多少个维度。这可以通过 ndim 关键字参数来实现,该参数被所有连续存储(如 TensorStorage 等)接受。当将多维存储传递给缓冲区时,缓冲区将自动将最后一个维度视为“时间”维度,这在 TorchRL 中是惯例。这可以通过 ReplayBuffer 中的 dim_extend 关键字参数来覆盖。正如我们下面将看到的,这是存储通过 ParallelEnv 或其串行对应物获得的轨迹的推荐方法。

在采样轨迹时,为了使学习多样化或提高采样效率,可能希望采样子轨迹。TorchRL 提供了两种不同的方法来实现这一点

  • SliceSampler 允许在 TensorStorage 的前导维度上按顺序存储的轨迹采样给定数量的切片。这是 TorchRL 中采样子轨迹的推荐方法,__尤其是__ 使用离线数据集时(它们使用该约定存储)。此策略要求在扩展回放缓冲区之前展平轨迹,并在采样后重塑它们。SliceSampler 类文档详细介绍了此存储和采样策略。请注意,SliceSampler 与多维存储兼容。以下示例展示了如何在展平 tensordict 和不展平 tensordict 的情况下使用此功能。在第一种情况下,我们正在从单个环境中收集数据。在这种情况下,我们对存储连接第一个维度上的传入数据感到满意,因为收集计划不会引入中断。

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果批次中有多个环境运行,我们仍然可以通过调用 data.reshape(-1) 将数据存储在同一个缓冲区中,这将把 [B, T] 的大小展平为 [B * T],但这意味着,例如,第一个环境的轨迹会被其他环境的轨迹交错,而这种情况是 SliceSampler 无法处理的。为了解决这个问题,我们建议在存储构造函数中使用 ndim 参数。

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 轨迹也可以独立存储,其中前导维的每个元素指向不同的轨迹。这要求轨迹具有一致的形状(或被填充)。我们提供了一个名为 RandomCropTensorDict 的自定义 Transform 类,它允许在缓冲区中采样子轨迹。请注意,与基于 SliceSampler 的策略不同,这里不需要指向开始和停止信号的 "episode""done" 键。以下是如何使用此类的一个示例。

检查点回放缓冲区

回放缓冲区的每个组件都有可能是有状态的,因此需要一种专门的方式来对其进行序列化。我们的回放缓冲区有两种独立的 API 用于将状态保存在磁盘上:dumps()loads() 将使用内存映射张量和 JSON 文件保存每个组件(除了变换)的数据,用于元数据。

除了 ListStorage 之外,此 API 保证了保存然后加载回的缓冲区将处于完全相同的状态,无论我们查看其采样器(例如,优先级树)的 LSTTS、其写入器(例如,max writer 堆)还是其存储。

该 API 保证了保存然后加载回的缓冲区将处于完全相同的状态,无论我们查看其采样器(例如,优先级树)、其写入器(例如,max writer 堆)还是其存储。

在底层,调用 dumps() 的简单调用将在特定文件夹中调用每个组件的公共 dumps 方法(不包括变换,因为我们通常不认为它们可以使用内存映射张量进行序列化)。

将数据保存在 TED 格式 中可能会比需要消耗更多内存。如果连续轨迹存储在缓冲区中,我们可以通过保存根目录中的所有观测以及 “next” 子张量字典中观测的最后一个元素来避免保存重复的观测,这可以将存储消耗减少多达两倍。为了实现这一点,提供了三个检查点类:FlatStorageCheckpointer 将丢弃重复的观测以压缩 TED 格式。在加载时,此类会将观测以正确的格式重写。如果缓冲区保存在磁盘上,此检查点执行的操作不需要任何额外的 RAM。NestedStorageCheckpointer 将使用嵌套张量保存轨迹,使数据表示更清晰(沿第一个维度的每个项代表一个不同的轨迹)。最后,H5StorageCheckpointer 将缓冲区保存在 H5DB 格式中,使用户能够压缩数据并节省更多空间。

警告

这些检查点对回放缓冲区做出了一些限制性假设。首先,假设 done 状态准确地表示了轨迹的结束(最后一个写入的轨迹除外,该轨迹的写入器光标指示了截断信号的位置)。对于 MARL 用途,应注意只允许元素数量与根张量字典相同的 done 状态:如果 done 状态具有存储的批次大小未表示的额外元素,这些检查点将失败。例如,在形状为 torch.Size([3, 4]) 的存储中,形状为 torch.Size([3, 4, 5]) 的 done 状态是不允许的。

以下是一个在实践中使用 H5DB 检查点的具体示例

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

每当无法使用 dumps() 保存数据时,另一种方法是使用 state_dict(),它返回一个可以使用 torch.save() 保存并可以使用 torch.load() 加载的数据结构,然后再调用 load_state_dict()。此方法的缺点是它在保存大型数据结构时会遇到困难,而这在使用回放缓冲区时很常见。

TorchRL Episode Data Format (TED)

在 TorchRL 中,顺序数据始终以一种特定格式呈现,称为 TorchRL Episode Data Format (TED)。此格式对于 TorchRL 中各种组件的无缝集成和功能至关重要。

某些组件(如回放缓冲区)在某种程度上对数据格式不敏感。然而,其他组件,特别是环境,则严重依赖它来确保平稳运行。

因此,理解 TED、它的用途以及如何与之交互非常重要。本指南将对 TED 进行清晰的解释,说明它为什么被使用以及如何有效地使用它。

TED 背后的基本原理

在强化学习 (RL) 领域,格式化顺序数据可能是一项复杂的任务。作为从业者,我们经常遇到数据在重置时(并非总是如此)交付的情况,有时数据在轨迹的最后一步提供或丢弃。

这种可变性意味着我们在数据集中会遇到不同长度的数据,并且并不总是能立即清楚如何匹配此数据集中各个元素之间的时间步。考虑以下含糊不清的数据集结构

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍一看,似乎 info 和 observation 一起提供(每次重置时各一个 + 每次调用 step 时各一个),这由 action 的元素少一个来暗示。但是,如果 info 的元素少一个,我们必须假定它要么在重置时被省略,要么在轨迹的最后一步未提供或未记录。如果没有数据结构的适当文档,就无法确定哪个 info 对应哪个时间步。

更复杂的是,一些数据集提供不一致的数据格式,其中 observationsinfos 在 rollout 的开始或结束时丢失,并且这种行为通常没有记录。TED 的主要目标是通过提供清晰一致的数据表示来消除这些歧义。

TED 的结构

TED 基于 RL 上下文中马尔可夫决策过程 (MDP) 的规范定义。在每一步,一个观测会制约一个动作,该动作会导致(1)一个新的观测,(2)一个任务完成的指示符(已终止、已截断、已完成),以及(3)一个奖励信号。

某些元素可能会丢失(例如,在模仿学习上下文中奖励是可选的),或者可以通过状态或信息容器传递额外信息。在某些情况下,需要额外信息才能在调用 step 时获得观测(例如,在无状态环境模拟器中)。此外,在某些场景下,“动作”(或任何其他数据)不能表示为单个张量,需要以不同的方式组织。例如,在多智能体 RL 设置中,动作、观测、奖励和完成信号可能是复合的。

TED 以单一、统一、无歧义的格式容纳了所有这些场景。我们通过在执行动作时设置一个限制来区分时间步 tt+1 发生的事情。换句话说,在调用 env.step 之前存在的所有内容都属于 t,之后发生的所有内容都属于 t+1

通用规则是,属于时间步 t 的所有内容都存储在张量字典的根目录中,而属于 t+1 的所有内容都存储在张量字典的 "next" 条目中。例如

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在 rollout 期间(使用 EnvBaseSyncDataCollector),当代理重置其步计数时(t <- t+1),"next" 张量字典的内容会通过 step_mdp() 函数带到根目录。您可以在此处阅读有关环境 API 的更多信息:此处

在大多数情况下,根目录中没有 True 值的 "done" 状态,因为任何 done 状态都会触发(部分)重置,这将使 "done" 变为 False。然而,这仅在自动执行重置时才成立。在某些情况下,部分重置不会触发重置,因此我们会保留这些数据,这些数据应该比观测值(例如)具有低得多的内存占用。

此格式消除了关于将观测值与其动作、信息或 done 状态匹配的任何歧义。

关于 TED 中单例维度的一个注释

在 TorchRL 中,标准做法是 done 状态(包括 terminated 和 truncated)和奖励应具有一个可以扩展以匹配观测、状态和动作的形状的维度,而无需任何其他操作(即,奖励必须具有与观测和/或动作或其嵌入相同的维度)。

基本上,此格式是可接受的(尽管不严格强制执行)

>>> print(rollout[t])
... TensorDict(
...     fields={
...         action: Tensor(n_action),
...         done: Tensor(1),  # The done state has a rightmost singleton dimension
...         next: TensorDict(
...             fields={
...                 done: Tensor(1),
...                 observation: Tensor(n_obs),
...                 reward: Tensor(1),  # The reward has a rightmost singleton dimension
...                 terminated: Tensor(1),
...                 truncated: Tensor(1),
...             batch_size=torch.Size([]),
...             device=cpu,
...             is_shared=False),
...         observation: Tensor(n_obs),  # the observation at reset
...         terminated: Tensor(1),  # the terminated at reset
...         truncated: Tensor(1),  # the truncated at reset
...     batch_size=torch.Size([]),
...     device=cpu,
...     is_shared=False)

这样做的基本原理是为了确保在观测和/或动作上执行的操作(例如值估计)的结果与奖励和 done 状态具有相同的维度数。这种一致性使得后续操作能够顺利进行

>>> state_value = f(observation)
>>> next_state_value = state_value + reward

如果没有这个奖励末尾的单例维度,广播规则(仅在张量可以从左侧扩展时工作)将尝试从左侧扩展奖励。这可能导致(最坏情况下)失败或(最坏情况下)引入错误。

展平 TED 以减少内存消耗

TED 会将观测值复制两次到内存中,这可能会影响在实践中使用此格式的可行性。由于它主要用于表示的便利性,因此可以以展平的方式存储数据,但在训练期间将其表示为 TED。

这在序列化回放缓冲区时尤其有用:例如,TED2Flat 类确保 TED 格式的数据结构在写入磁盘之前被展平,而 Flat2TED 加载钩会在反序列化期间取消展平此结构。

张量字典的维度

在 rollout 期间,所有收集的张量字典都将沿一个位于末尾的新维度堆叠。收集器和环境都将此维度标记为 "time" 名称。例如

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

这确保了时间维度在数据结构中被清晰地标记并易于识别。

特殊情况和注释

多智能体数据表示

多智能体数据格式的文档可以在 MARL 环境 API 部分找到。

基于记忆的策略(RNN 和 Transformer)

在上面的示例中,只有 env.step(data) 会生成在下一步需要读取的数据。但是,在某些情况下,策略还会输出在下一步需要的信息。这通常是基于 RNN 的策略的情况,它们会输出一个动作以及一个需要在下一步使用的循环状态。为了适应这一点,我们建议用户调整其 RNN 策略,将此数据写入张量字典的 "next" 条目下。这确保了此内容将在下一步带到根目录。更多信息可以在 GRUModuleLSTMModule 中找到。

多步

收集器允许用户在读取数据时跳过步骤,累积未来 n 步的奖励。这种技术在像 Rainbow 这样的 DQN 类算法中很受欢迎。MultiStep 类对来自收集器的批次执行此数据转换。在这些情况下,像下面这样的检查会失败,因为下一个观测值会偏移 n 步

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

内存要求怎么样?

如果实现得不够精细,此数据格式将消耗大约是展平表示的两倍内存。在某些内存密集型设置中(例如,在 AtariDQNExperienceReplay 数据集中),我们只将 T+1 观测值存储在磁盘上,并在获取时在线执行格式化。在其他情况下,我们假设 2 倍的内存成本是获得更清晰表示的小代价。然而,为离线数据集推广延迟表示肯定会是一个有用的功能,我们欢迎在这方面做出贡献!

数据集

TorchRL 提供离线 RL 数据集的包装器。这些数据以 ReplayBuffer 实例的形式呈现,这意味着它们可以根据需要使用转换、采样器和存储进行自定义。例如,可以使用 SelectTransformExcludeTransform 将条目过滤进或过滤出数据集。

默认情况下,数据集存储为内存映射张量,允许它们被快速采样,内存占用几乎为零。

例如

注意

安装依赖项是用户的责任。对于 D4RL,需要克隆 仓库,因为最新的 wheels 未在 PyPI 上发布。对于 OpenML,需要 scikit-learnpandas

转换数据集

在许多情况下,原始数据不会按原样使用。自然的解决方案可能是将 Transform 实例传递给数据集构造函数并动态修改样本。这会起作用,但会产生额外的转换运行时。如果转换(至少一部分)可以预先应用于数据集,则可以节省大量的磁盘空间和一些采样时产生的开销。为此,可以使用 preprocess() 方法。此方法将在数据集的每个元素上运行一个每样本预处理管道,并用其转换后的版本替换现有数据集。

转换后,重新创建相同的数据集将产生另一个具有相同转换存储的对象(除非使用了 download="force"

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

离线数据集的父类。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN Experience 回放类。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的 Experience 回放类。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari Experience 回放数据集。

组合数据集

在离线 RL 中,通常需要同时处理多个数据集。此外,TorchRL 通常有精细的数据集命名法,其中每个任务被单独表示,而其他库将以更紧凑的方式表示这些数据集。为了让用户组合多个数据集,我们提供了一个 ReplayBufferEnsemble 原始类,它允许用户一次从多个数据集中采样。

如果各个数据集的格式不同,可以使用 Transform 实例。在下面的示例中,我们创建了两个具有语义相同但名称不同(("some", "key")"another_key")的虚拟数据集,并展示了如何重命名它们以获得匹配的名称。我们还调整了图像大小,以便在采样期间可以将它们堆叠在一起。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

回放缓冲区的集合。

SamplerEnsemble(*samplers[, p, ...])

采样器的集合。

StorageEnsemble(*storages[, transforms])

存储的集合。

WriterEnsemble(*writers)

写入器的集合。

TensorSpec

TensorSpec 父类及其子类定义了 TorchRL 中状态、观测、动作、奖励和完成状态的基本属性,例如它们的形状、设备、数据类型和域。

您的环境规格必须与它发送和接收的输入和输出匹配,这一点很重要,因为 ParallelEnv 将从这些规格创建缓冲区以与生成的进程通信。请查看 torchrl.envs.utils.check_env_specs() 方法进行健全性检查。

如果需要,可以使用 make_composite_from_td() 函数从数据自动生成规格。

规格分为两大类:数值和分类。

数值 TensorSpec 子类。

数值

有界

无界

有界离散

有界连续

无界离散

无界连续

每当创建 Bounded 实例时,它的域(通过其数据类型隐式定义或通过 “domain” 关键字参数显式定义)将决定实例化的类是 BoundedContinuous 还是 BoundedDiscrete 类型。对于 Unbounded 类也是如此。有关更多信息,请参阅这些类。

分类 TensorSpec 子类。

分类

独热编码

多独热编码

分类

多分类

二元

gymnasium 不同,TorchRL 没有任意规格列表的概念。如果必须组合多个规格,TorchRL 假定数据将以字典形式(更具体地说,以 TensorDict 或相关格式)呈现。在这种情况下,相应的 TensorSpec 类是 Composite 规格。

但是,可以使用 stack() 将规格堆叠在一起:如果它们相同,它们的形状将相应地扩展。否则,将通过 Stacked 类创建延迟堆叠。

同样,TensorSpecs 具有与 TensorTensorDict 一些共同的行为:它们可以像常规 Tensor 实例一样进行重塑、索引、压缩、解压、移动到另一个设备(to)或解绑(unbind)。

其中一些维度为 -1 的规格被称为“动态”规格,负维度表示相应数据的形状不一致。当被优化器或环境(例如,像 ParallelEnv 这样的批处理环境)看到时,这些负形状会告诉 TorchRL 避免使用缓冲区,因为张量的形状是不可预测的。

TensorSpec(shape, space, device, dtype, ...)

张量元数据容器的父类。

Binary([n, shape, device, dtype])

二进制离散张量规格。

Bounded(*args, **kwargs)

有界张量规格。

Categorical(n[, shape, device, dtype, mask])

离散张量规格。

Composite(*args, **kwargs)

TensorSpecs 的组合。

MultiCategorical(nvec[, shape, device, ...])

离散张量规格的连接。

MultiOneHot(nvec[, shape, device, dtype, ...])

独热离散张量规格的连接。

NonTensor([shape, device, dtype, ...])

非张量数据的规格。

OneHot(n[, shape, device, dtype, ...])

一维独热离散张量规格。

Stacked(*specs, dim)

张量规格的延迟表示。

StackedComposite(*args, **kwargs)

复合规格的延迟表示。

Unbounded(*args, **kwargs)

无界张量规格。

UnboundedContinuous(*args, **kwargs)

具有连续空间的 torchrl.data.Unbounded 的专用版本。

UnboundedDiscrete(*args, **kwargs)

具有离散空间的 torchrl.data.Unbounded 的专用版本。

以下类已弃用,仅指向上述类

BinaryDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Binary 版本。

BoundedTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Bounded 版本。

CompositeSpec(*args, **kwargs)

已弃用的 torchrl.data.Composite 版本。

DiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Categorical 版本。

LazyStackedCompositeSpec(*args, **kwargs)

已弃用的 torchrl.data.StackedComposite 版本。

LazyStackedTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Stacked 版本。

MultiDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.MultiCategorical 版本。

MultiOneHotDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.MultiOneHot 版本。

NonTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.NonTensor 版本。

OneHotDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.OneHot 版本。

UnboundedContinuousTensorSpec(*args, **kwargs)

具有连续空间的已弃用的 torchrl.data.Unbounded 版本。

UnboundedDiscreteTensorSpec(*args, **kwargs)

具有离散空间的已弃用的 torchrl.data.Unbounded 版本。

树和森林

TorchRL 提供了一组可以高效表示树和森林的类和函数,这对于蒙特卡洛树搜索(MCTS)算法尤其有用。

TensorDictMap

核心上,MCTS API 依赖于 TensorDictMap,它作为一个存储,其中的索引可以是任何数值对象。在传统的存储(例如 TensorStorage)中,只允许使用整数索引。

>>> storage = TensorStorage(...)
>>> data = storage[3]

TensorDictMap 允许我们对存储进行更高级的查询。典型的例子是当我们的存储包含一组 MDP,并且我们想根据初始观察和动作对来重建一个轨迹。在张量方面,这可以用以下伪代码表示:

>>> next_state = storage[observation, action]

(如果该对有多个后续状态,则可以返回后续状态的堆栈)。这个 API 是合理的,但会很受限制:允许由多个张量组成的观察或动作可能很难实现。相反,我们提供一个包含这些值的 tensordict,并让存储知道要查找哪些 in_keys 来查询后续状态。

>>> td = TensorDict(observation=observation, action=action)
>>> next_td = storage[td]

当然,这个类也允许我们用新数据扩展存储。

>>> storage[td] = next_state

这很有用,因为它允许我们表示复杂的 rollout 结构,其中在给定节点(即给定观察)会采取不同的动作。所有已观察到的 (观察,动作) 对都可能引导我们到一个(一组)我们可以进一步使用的 rollout。

MCTSForest

从初始观察构建树就变成了一个高效组织数据的问题。核心上 MCTSForest 包含两个存储:第一个存储将观察映射到哈希值和数据集中过去遇到的动作索引。

>>> data = TensorDict(observation=observation)
>>> metadata = forest.node_map[data]
>>> index = metadata["_index"]

其中 forest 是一个 MCTSForest 实例。然后,第二个存储会跟踪与观察相关的动作和结果。

>>> next_data = forest.data_map[index]

next_data 条目可以有任何形状,但通常会匹配 index 的形状(因为每个索引对应一个动作)。一旦获得 next_data,就可以将其与 data 组合形成一组节点,并为每个节点展开树。下图显示了这是如何完成的。

../_images/collector-copy.png

MCTSForest 对象构建 Tree。流程图表示如何从初始观察 o 构建树。get_tree 方法将输入数据结构(根节点)传递给 node_map TensorDictMap 实例,该实例返回一组哈希值和索引。然后,这些索引用于查询与根节点关联的相应动作、后续观察、奖励等元组。从每个元组创建一个顶点(如果需要紧凑表示,则可能带有更长的 rollout)。然后,这组顶点用于进一步构建树,并将这些顶点堆叠在一起,形成根部的树枝。这个过程会重复进行,直到达到给定的深度或树无法再扩展为止。

BinaryToDecimal(num_bits, device, dtype[, ...])

一个模块,用于将二进制编码的张量转换为十进制。

HashToInt()

将哈希值转换为整数,该整数可用于索引连续存储。

MCTSForest(*[, data_map, node_map, ...])

MCTS 树的集合。

QueryModule(*args, **kwargs)

用于为存储生成兼容索引的模块。

RandomProjectionHash(*[, n_components, ...])

一个模块,它结合了随机投影和 SipHash,以获得低维张量,更容易通过 SipHash 进行嵌入。

SipHash([as_tensor])

用于计算给定张量的 SipHash 值的模块。

TensorDictMap(*args, **kwargs)

TensorDict 的 Map 存储。

TensorMap()

实现不同存储的抽象。

Tree([count, wins, index, hash, node_id, ...])

大型语言模型和人类反馈强化学习 (RLHF)

警告

这些 API 已弃用,将在未来移除。请改用 torchrl.data.llm 模块。有关更多信息,请参阅完整的 LLM 文档

在 LLM 训练后(例如 GRPO 或人类反馈强化学习 (RLHF))数据至关重要。鉴于这些技术通常用于语言领域,而该领域的 RL 子领域在库中很少涉及,因此我们提供了特定的实用程序来方便与 datasets 等外部库进行交互。这些实用程序包括用于标记化数据、将其格式化为适合 TorchRL 模块的方式以及优化存储以进行高效采样。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

Prompt 数据集的标记化配方。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

用于使用因果语言模型执行 rollout 的类。

TensorDictTokenizer(tokenizer, max_length[, ...])

用于在文本示例上应用标记器的过程函数工厂。

TokenizedDatasetLoader(split, max_length, ...)

加载标记化的数据集,并缓存其内存映射副本。

create_infinite_iterator(iterator)

无限地迭代一个迭代器。

get_dataloader(batch_size, block_size, ...)

创建一个数据集并从中返回一个数据加载器。

ConstantKLController(*[, kl_coef, model])

恒定 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

自适应 KL 控制器,如 Ziegler 等人在《Fine-Tuning Language Models from Human Preferences》中所述。

工具

DensifyReward(*args, **kwargs)

一个实用程序,用于将结束状态的奖励重新分配给轨迹的其余部分。

Flat2TED([done_key, shift_key, is_full_key, ...])

一个存储加载钩子,用于将扁平化的 TED 数据反序列化为 TED 格式。

H5Combine()

将持久 tensordict 中的轨迹合并到一个存储在文件系统中的单个 standing tensordict 中。

H5Split([done_key, shift_key, is_full_key, ...])

将使用 TED2Nested 准备的数据集分割为 TensorDict,其中每个轨迹都存储为对其父嵌套张量的视图。

MultiStep(gamma, n_steps)

多步奖励转换。

Nested2TED([done_key, shift_key, ...])

将嵌套的 tensordict(其中每行是一个轨迹)转换为 TED 格式的 tensordict。

TED2Flat([done_key, shift_key, is_full_key, ...])

一个存储保存钩子,用于将 TED 数据序列化为紧凑格式。

TED2Nested(*args, **kwargs)

将 TED 格式的数据集转换为填充了嵌套张量的 tensordict,其中每行是一个轨迹。

check_no_exclusive_keys(spec[, recurse])

给定一个 TensorSpec,如果不存在独占键,则返回 true。

consolidate_spec(spec[, ...])

给定一个 TensorSpec,通过添加 0 形状的 spec 来移除独占键。

contains_lazy_spec(spec)

如果 spec 包含惰性堆叠 spec,则返回 true。

MultiStepTransform(n_steps, gamma, *[, ...])

ReplayBuffers 的多步转换。

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源