快捷方式

torchrl.data 包

经验回放缓冲区

经验回放缓冲区是离策略强化学习算法的核心部分。TorchRL 提供了几种常用经验回放缓冲区的有效实现。

ReplayBuffer(*[, storage, sampler, writer, ...])

一个通用、可组合的经验回放缓冲区类。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

优先经验回放缓冲区。

TensorDictReplayBuffer(*[, priority_key])

TensorDict 特定的 ReplayBuffer 类包装器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

TensorDict 特定的 PrioritizedReplayBuffer 类包装器。

RayReplayBuffer(*args, replay_buffer_cls, ...)

Replay Buffer 的 Ray 实现,可以远程扩展和采样。

RemoteTensorDictReplayBuffer(*args, **kwargs)

一个对远程调用友好的 ReplayBuffer 类。

可组合的经验回放缓冲区

我们也为用户提供了组合经验回放缓冲区的能力。我们为经验回放缓冲区的用法提供了广泛的解决方案,包括支持几乎任何数据类型;内存、设备或物理内存存储;多种采样策略;变换的使用等。

支持的数据类型和存储选择

理论上,经验回放缓冲区支持任何数据类型,但我们无法保证每个组件都会支持任何数据类型。最基础的经验回放缓冲区实现由一个 ReplayBuffer 基类和一个 ListStorage 存储组成。这非常低效,但它允许您存储具有非张量数据的复杂数据结构。连续内存中的存储包括 TensorStorageLazyTensorStorageLazyMemmapStorage。这些类将 TensorDict 数据作为一流公民支持,但也支持任何 PyTree 数据结构(例如,元组、列表、字典及其嵌套版本)。TensorStorage 存储要求您在构造时提供存储,而 TensorStorage(RAM、CUDA)和 LazyMemmapStorage(物理内存)将在第一次扩展后为您预先分配存储。

以下是一些示例,首先从通用的 ListStorage 开始

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

写入缓冲区的主要入口点是 add()extend()。还可以使用 __setitem__(),在这种情况下,数据会写入指定位置,而不会更新缓冲区的长度或游标。当从缓冲区采样项目并在之后原地更新其值时,这会很有用。

使用 TensorStorage,我们告诉 RB 希望存储是连续的,这效率最高,但也更受限制。

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下来,我们可以避免创建容器,让存储自动创建。这在使用 PyTrees 和 tensordicts 时非常有用!对于 PyTrees 和其他数据结构,add() 将传递给它的样本视为单个类型的实例。extend() 另一方面,它将认为数据是可迭代的。对于张量、tensordicts 和列表(如下文所示),可迭代对象在根级别查找。对于 PyTrees,我们假设树中所有叶子(张量)的前导维度匹配。如果不匹配,extend 将引发异常。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

extend() 在处理值列表时可能具有模糊的签名,这些值列表应解释为 PyTree(在这种情况下,列表中的所有元素都将放入存储中的 PyTree 的一个切片中)或逐个添加值的列表。为了解决这个问题,TorchRL 明确区分列表和元组:元组将被视为 PyTree,列表(在根级别)将被解释为要逐个添加到缓冲区的值堆栈。

采样和索引

经验回放缓冲区可以被索引和采样。索引和采样将数据收集到存储中的指定索引,然后通过一系列变换和 collate_fn 进行处理,这些变换和 collate_fn 可以传递给经验回放缓冲区的 __init__ 函数。collate_fn 带有默认值,在大多数情况下应符合用户的期望,因此您通常不必担心它。变换通常是 Transform 的实例,尽管普通函数也可以工作(后一种情况下,inv() 方法将被忽略,而在前一种情况下,它可以用于在数据传递给缓冲区之前进行预处理)。最后,可以通过在构造函数中将线程数通过 prefetch 关键字参数传递来使用多线程实现采样。我们建议用户在实际环境中对其进行基准测试,然后再采用此技术,因为不能保证它会在实践中带来更快的吞吐量,具体取决于使用它的机器和环境。

采样时,batch_size 可以在构造时(例如,如果它在整个训练过程中是恒定的)或在 sample() 方法中指定。

为了进一步完善采样策略,我们建议您查看我们的采样器!

以下是一些如何从经验回放缓冲区中获取数据的示例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下组件

CompressedListStorage(max_size, *[, ...])

压缩和解压缩数据的存储。

CompressedListStorageCheckpointer()

CompressedListStorage 的存储检查点。

FlatStorageCheckpointer([done_keys, reward_keys])

以紧凑形式保存存储,节省 TED 格式的空间。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以紧凑形式保存存储,节省 TED 格式的空间,并使用 H5 格式保存数据。

ImmutableDatasetWriter([compilable])

不可变数据集的阻塞写入器。

LazyMemmapStorage(max_size, *[, ...])

用于张量和 tensordicts 的内存映射存储。

LazyTensorStorage(max_size, *[, device, ...])

用于张量和 tensordicts 的预分配张量存储。

ListStorage([max_size, compilable, device])

存储在列表中的存储。

LazyStackStorage([max_size, compilable, ...])

返回 LazyStackTensorDict 实例的 ListStorage。

ListStorageCheckpointer()

ListStoage 的存储检查点。

NestedStorageCheckpointer([done_keys, ...])

以紧凑形式保存存储,节省 TED 格式的空间,并使用内存映射的嵌套张量。

PrioritizedSampler(max_capacity, alpha, beta)

经验回放缓冲区的优先采样器。

PrioritizedSliceSampler(max_capacity, alpha, ...)

使用优先采样,沿第一个维度对数据切片进行采样,并给出开始和停止信号。

RandomSampler()

用于可组合经验回放缓冲区的均匀随机采样器。

RoundRobinWriter([compilable])

用于可组合经验回放缓冲区的 RoundRobin Writer 类。

Sampler()

用于可组合经验回放缓冲区的通用采样器基类。

SamplerWithoutReplacement([drop_last, shuffle])

一个数据消耗采样器,可确保同一样本不会出现在连续的批次中。

SliceSampler(*[, num_slices, slice_len, ...])

沿第一个维度对数据切片进行采样,并给出开始和停止信号。

SliceSamplerWithoutReplacement(*[, ...])

使用无替换采样,沿第一个维度对数据切片进行采样,并给出开始和停止信号。

Storage(max_size[, checkpointer, compilable])

Storage 是经验回放缓冲区的容器。

StorageCheckpointerBase()

存储检查点器的公共基类。

StorageEnsembleCheckpointer()

集合存储的检查点器。

TensorDictMaxValueWriter([rank_key, reduction])

用于可组合经验回放缓冲区的写入器类,该类根据某个排名键保留最高元素。

TensorDictRoundRobinWriter([compilable])

用于可组合、基于 tensordict 的经验回放缓冲区的 RoundRobin Writer 类。

TensorStorage(storage[, max_size, device, ...])

用于张量和 tensordicts 的存储。

TensorStorageCheckpointer()

TensorStorages 的存储检查点。

Writer([compilable])

经验回放缓冲区基类的写入器类。

存储选择对经验回放缓冲区的采样延迟有很大影响,尤其是在数据量较大的分布式强化学习设置中。LazyMemmapStorage 在分布式设置中具有共享存储,强烈推荐使用,因为它具有较低的 MemoryMappedTensors 序列化成本,并且能够指定文件存储位置以提高节点故障恢复能力。通过在 ListStorage 上的粗略基准测试(请参阅 https://github.com/pytorch/rl/tree/main/benchmarks/storage)发现,以下是平均采样延迟的改进。

存储类型

加速

ListStorage

1x

LazyTensorStorage

1.83x

LazyMemmapStorage

3.44x

内存效率的压缩存储

对于内存使用量或内存带宽是主要考虑因素的应用——尤其是在存储或传输图像、音频或文本等大型感官观测时——CompressedListStorage 通过压缩实现了显著的内存节省。

主要特点

  • 内存效率:通过压缩实现显著的内存节省。

  • 数据完整性:通过无损压缩保持完整的数据保真度。

  • 灵活压缩:默认使用 zstd 压缩,支持自定义压缩算法。

  • TensorDict 支持:与 TensorDict 结构无缝集成。

  • 检查点:完全支持压缩数据的保存和加载。

  • 批量 GPU 压缩/解压缩:可直接从 VRAM 实现高效的经验回放缓冲区采样。

CompressedListStorage 在存储时压缩数据,在检索时解压缩,对于 Atari 图像实现了 95x–122x 的压缩比,同时保持完整的数据保真度。我们在 Atari Learning Environment (ALE) 中,使用随机策略在 Pong 游戏中进行了为期一个回合的采样,并在每个压缩级别看到了这些结果。

zstd 的压缩级别

1

3

8

12

22

ALE Pong 中的压缩比

95x

99x

106x

111x

122x

使用示例

>>> import torch
>>> from torchrl.data import ReplayBuffer, CompressedListStorage
>>> from tensordict import TensorDict
>>>
>>> # Create a compressed storage for image data
>>> storage = CompressedListStorage(max_size=1000, compression_level=3)
>>> rb = ReplayBuffer(storage=storage, batch_size=32)
>>>
>>> # Add image data
>>> images = torch.randn(100, 3, 84, 84)  # Atari-like frames
>>> data = TensorDict({"obs": images}, batch_size=[100])
>>> rb.extend(data)
>>>
>>> # Sample data (automatically decompressed)
>>> sample = rb.sample(32)
>>> print(sample["obs"].shape)  # torch.Size([32, 3, 84, 84])

压缩级别可以从 1(快速,压缩少)调整到 22(慢速,压缩多),对于大多数用例,级别 3 是一个不错的默认值。

对于自定义压缩算法

>>> def my_compress(tensor):
...     return tensor.to(torch.uint8)  # Simple example
>>>
>>> def my_decompress(compressed_tensor, metadata):
...     return compressed_tensor.to(metadata["dtype"])
>>>
>>> storage = CompressedListStorage(
...     max_size=1000,
...     compression_fn=my_compress,
...     decompression_fn=my_decompress
... )

注意

CompressedListStorage 在 Python 版本至少为 3.14 时使用 zstd,否则默认为 zlib。

注意

批量 GPU 压缩依赖于 nvidia.nvcomp,请参阅示例代码 examples/replay-buffers/compressed_replay_buffer.py

跨进程共享经验回放缓冲区

只要其组件可共享,经验回放缓冲区就可以在进程之间共享。此功能允许多个进程协同收集数据并填充共享的经验回放缓冲区,而不是将数据集中在主进程上,这可能会产生一些数据传输开销。

可共享的存储包括 LazyMemmapStorage 或任何 TensorStorage 的子类,只要它们被实例化并且其内容被存储为内存映射张量。有状态的写入器(如 TensorDictRoundRobinWriter)目前不可共享,有状态的采样器(如 PrioritizedSampler)也是如此。

共享的经验回放缓冲区可以在任何有权访问它的进程上读取和扩展,如下例所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

存储轨迹

将轨迹存储在经验回放缓冲区中并不困难。需要注意的一点是,经验回放缓冲区的默认大小是存储的前导维度的大小:换句话说,创建一个大小为 100 万的存储的经验回放缓冲区并不意味着存储 100 万帧,而是存储 100 万条轨迹。但是,如果轨迹(或回合/滚动)在存储之前被展平,容量仍将是 100 万步。

有一种方法可以绕过这个问题,通过告诉存储在保存数据时应考虑的维度数量。这可以通过 ndim 关键字参数来实现,该参数被所有连续存储(如 TensorStorage 等)接受。当将多维存储传递给缓冲区时,缓冲区将自动将最后一个维度视为 TorchRL 中的传统“时间”维度。可以通过 ReplayBuffer 中的 dim_extend 关键字参数来覆盖此行为。这是保存通过 ParallelEnv 或其串行对应项获得的轨迹的推荐方法,如下文所示。

采样轨迹时,为了多样化学习或提高采样效率,可能需要采样子轨迹。TorchRL 提供了两种不同的方法来实现这一点

  • SliceSampler 允许沿 TensorStorage 的前导维度按顺序存储的给定数量的轨迹切片进行采样。这是 TorchRL 中采样子轨迹的推荐方法,__尤其__是在使用离线数据集(以该约定存储)时。此策略需要在扩展经验回放缓冲区之前展平轨迹,并在采样后对其进行重塑。SliceSampler 类文档中提供了关于此存储和采样策略的详细信息。请注意,SliceSampler 与多维存储兼容。以下示例展示了如何在展平 tensordict 和不展平 tensordict 的情况下使用此功能。在第一种场景中,我们从单个环境中收集数据。在这种情况下,我们满意于一个存储,该存储将传入的数据沿第一个维度连接起来,因为收集计划不会引入中断。

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果一批中运行的环境不止一个,我们仍然可以通过调用 data.reshape(-1) 将数据存储在同一个缓冲区中,该函数将 [B, T] 大小展平为 [B * T],但这意味着,例如,第一个环境的轨迹会被批次中的其他环境的轨迹交错,这是 SliceSampler 无法处理的场景。为了解决这个问题,我们建议在存储构造函数中使用 ndim 参数。

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 轨迹也可以独立存储,前导维度中的每个元素指向不同的轨迹。这要求轨迹具有一致的形状(或被填充)。我们提供了一个名为 RandomCropTensorDict 的自定义 Transform 类,允许在缓冲区中采样子轨迹。请注意,与基于 SliceSampler 的策略不同,这里不需要“episode”或“done”键指向开始和停止信号。以下是如何使用此类的一个示例。

检查点经验回放缓冲区

经验回放缓冲区的每个组件都可能是有状态的,因此需要一种专门的方法来对其进行序列化。我们的经验回放缓冲区提供了两个独立的 API 来将它们的状态保存在磁盘上:dumps()loads() 将使用内存映射张量和 JSON 文件保存除变换之外的每个组件的数据(存储、写入器、采样器),而元数据使用 JSON 文件保存。

此方法适用于除 ListStorage 之外的所有类,因为其内容无法预测(因此不符合 tensordict 库中找到的内存映射数据结构)。

此 API 保证,已保存然后重新加载的缓冲区将处于完全相同的状态,无论我们查看其采样器(例如,优先级树)、其写入器(例如,最大写入器堆)还是其存储的状态。

在底层,对 dumps() 的简单调用将在每个组件的特定文件夹中调用公共 dumps 方法(不包括变换,因为我们通常不假设它们可以使用内存映射张量进行序列化)。

然而,将数据保存为 TED 格式 可能会消耗比所需更多的内存。如果连续轨迹存储在缓冲区中,我们可以通过保存根目录下的所有观测值以及“next”子 tensordict 观测值的最后一个元素来避免保存重复的观测值,这可以将存储消耗量最多减少一半。为此,提供了三个检查点类:FlatStorageCheckpointer 将丢弃重复的观测值以压缩 TED 格式。加载时,此类将以正确的格式重写观测值。如果缓冲区保存在磁盘上,此检查点器执行的操作不需要额外的 RAM。NestedStorageCheckpointer 将使用嵌套张量保存轨迹,使数据表示更清晰(第一个维度上的每个元素代表一个不同的轨迹)。最后,H5StorageCheckpointer 将缓冲区保存在 H5DB 格式中,使用户能够压缩数据并节省更多空间。

警告

检查点器对经验回放缓冲区做出了一些限制性假设。首先,假设 done 状态准确地表示了轨迹的结束(最后一个写入的轨迹除外,其写入器游标指示了截断信号的位置)。对于 MARL 用途,应注意只允许“done”状态具有与根 tensordict 相同数量的元素:如果“done”状态包含存储批次大小中未表示的额外元素,这些检查点器将失败。例如,形状为 torch.Size([3, 4, 5]) 的“done”状态在形状为 torch.Size([3, 4]) 的存储中是不允许的。

以下是如何在实践中使用 H5DB 检查点器的具体示例

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

当无法使用 dumps() 保存数据时,另一种方法是使用 state_dict(),它返回一个可以由 torch.save() 保存并由 torch.load() 加载,然后调用 load_state_dict() 的数据结构。此方法的缺点是它难以保存大型数据结构,而这在使用经验回放缓冲区时很常见。

TorchRL Episode Data Format (TED)

在 TorchRL 中,顺序数据始终以一种特定格式呈现,称为 TorchRL Episode Data Format (TED)。此格式对于 TorchRL 各组件的无缝集成和功能至关重要。

某些组件,例如经验回放缓冲区,对数据格式不太敏感。但是,其他组件,尤其是环境,严重依赖它才能顺利运行。

因此,理解 TED 及其用途,以及如何与之交互至关重要。本指南将清楚地解释 TED、其使用原因以及如何有效地使用它。

TED 的基本原理

在强化学习(RL)领域,格式化顺序数据可能是一项复杂的任务。作为实践者,我们经常遇到在重置时(尽管不总是)提供数据,有时在轨迹的最后一步提供或丢弃数据的情况。

这种可变性意味着我们可以在数据集中观察到不同长度的数据,并且并不总是清楚如何匹配该数据集各种元素之间的时间步长。考虑以下歧义数据集结构

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍一看,信息和观测似乎是同时提供的(在重置时各一个 + 在每个步调用时各一个),这与动作的元素数量少一个相对应。但是,如果信息少一个元素,我们必须假设它在重置时被省略,或者在轨迹的最后一步未提供或未记录。如果没有正确的数据结构文档,就无法确定哪个信息对应哪个时间步长。

更复杂的是,某些数据集提供不一致的数据格式,其中 observationsinfos 在回滚的开始或结束时缺失,并且这种行为通常未被记录。TED 的主要目标是通过提供清晰一致的数据表示来消除这些歧义。

TED 的结构

TED 基于 RL 上下文中马尔可夫决策过程 (MDP) 的规范定义。在每一步,一个观测值会条件化一个动作,该动作会产生(1)一个新的观测值,(2)一个任务完成的指示符(终止、截断、完成),以及(3)一个奖励信号。

某些元素可能缺失(例如,在模仿学习上下文中,奖励是可选的),或者可能通过状态或信息容器传递其他信息。在某些情况下,在调用 step 时需要其他信息才能获取观测值(例如,在无状态环境模拟器中)。此外,在某些场景下,“动作”(或任何其他数据)不能表示为单个张量,需要以不同的方式组织。例如,在多代理 RL 设置中,动作、观测值、奖励和完成信号可能是复合的。

TED 能够以一种统一、明确的格式处理所有这些场景。我们通过在执行动作时设置一个限制来区分时间步 tt+1 发生的事情。换句话说,在调用 env.step 之前存在的所有内容都属于 t,之后的所有内容都属于 t+1

一般规则是,属于时间步 t 的所有内容都存储在 tensordict 的根目录中,而属于 t+1 的所有内容都存储在 tensordict 的 "next" 条目中。示例如下

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在回滚过程中(使用 EnvBaseSyncDataCollector),当代理重置其步数计数时(t <- t+1),"next" tensordict 的内容将被移到根目录。您可以在此处阅读有关环境 API 的更多信息:这里

在大多数情况下,根目录中没有 True 值的 "done" 状态,因为任何完成状态都将触发(部分)重置,这将把 "done" 变为 False。但是,这仅在自动执行重置时才成立。在某些情况下,部分重置不会触发重置,因此我们会保留这些数据,例如,这些数据应该比观测值占用更少的内存。

此格式消除了关于观测值与其动作、信息或完成状态匹配的任何歧义。

关于 TED 中单例维度的说明

在 TorchRL 中,标准做法是 done 状态(包括终止和截断)和奖励应具有一个可以扩展以匹配观测值、状态和动作形状的维度,而无需重复(即,奖励的维度必须与观测值和/或动作,或它们的嵌入相同)。

本质上,此格式是可接受的(尽管不是严格强制的)

>>> print(rollout[t])
... TensorDict(
...     fields={
...         action: Tensor(n_action),
...         done: Tensor(1),  # The done state has a rightmost singleton dimension
...         next: TensorDict(
...             fields={
...                 done: Tensor(1),
...                 observation: Tensor(n_obs),
...                 reward: Tensor(1),  # The reward has a rightmost singleton dimension
...                 terminated: Tensor(1),
...                 truncated: Tensor(1),
...             batch_size=torch.Size([]),
...             device=cpu,
...             is_shared=False),
...         observation: Tensor(n_obs),  # the observation at reset
...         terminated: Tensor(1),  # the terminated at reset
...         truncated: Tensor(1),  # the truncated at reset
...     batch_size=torch.Size([]),
...     device=cpu,
...     is_shared=False)

这样做的基本原理是确保对观测值和/或动作的操作结果(例如,值估计)具有与奖励和 done 状态相同的维度数。这种一致性允许后续操作顺利进行。

>>> state_value = f(observation)
>>> next_state_value = state_value + reward

如果没有奖励末尾的这个单例维度,广播规则(仅在张量可以从左侧扩展时工作)将尝试从左侧扩展奖励。这可能会导致失败(最坏情况)或引入错误(更坏情况)。

展平 TED 以减少内存消耗

TED 将观测值复制两次到内存中,这可能会影响在该格式在实践中的可行性。由于它主要用于表示的便利性,因此可以以扁平的方式存储数据,但在训练期间将其表示为 TED。

这在序列化经验回放缓冲区时特别有用:例如,TED2Flat 类确保 TED 格式的数据结构在写入磁盘之前被展平,而 Flat2TED 加载钩会在反序列化期间取消展平此结构。

Tensordict 的维度

在回滚过程中,所有收集的 tensordicts 都将沿着位于末尾的新维度堆叠。收集器和环境都会将此维度标记为 "time" 名称。示例如下

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

这确保了时间维度在数据结构中清晰标记并易于识别。

特殊情况和脚注

多代理数据表示

多代理数据格式文档可在 MARL 环境 API 部分访问。

基于内存的策略(RNN 和 Transformer)

在上面提供的示例中,只有 env.step(data) 生成了需要在下一步读取的数据。但是,在某些情况下,策略还会输出需要在下一步中使用的信息。这通常是基于 RNN 的策略的情况,它输出一个动作以及一个需要在下一步中使用的递归状态。为了适应这一点,我们建议用户调整其 RNN 策略,将此数据写入 tensordict 的 "next" 条目下。这确保了此内容将在下一步移到根目录。有关更多信息,请参阅 GRUModuleLSTMModule

多步

收集器允许用户在读取数据时跳过步数,累积未来 n 步的奖励。此技术在 DQN 类算法(如 Rainbow)中很受欢迎。MultiStep 类对来自收集器的数据批次执行此数据转换。在这些情况下,像这样的检查会失败,因为下一个观测值会偏移 n 步。

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

内存需求如何?

天真地实现,这种数据格式消耗的内存大约是扁平表示的两倍。在某些内存密集型设置中(例如,在 AtariDQNExperienceReplay 数据集中),我们只将 T+1 观测值存储在磁盘上,并在获取时在线执行格式化。在其他情况下,我们认为 2 倍的内存成本是获得更清晰表示所需的小代价。然而,为离线数据集推广延迟表示无疑将是一个有益的功能,我们欢迎在这方面做出贡献!

数据集

TorchRL 提供离线 RL 数据集的包装器。这些数据以 ReplayBuffer 实例的形式提供,这意味着它们可以根据需要使用变换、采样器和存储进行自定义。例如,可以使用 SelectTransformExcludeTransform 将条目筛选进出数据集。

默认情况下,数据集存储为内存映射张量,允许它们几乎没有内存占用即可快速采样。

以下是一个示例

注意

安装依赖项是用户的责任。对于 D4RL,需要克隆 存储库,因为最新的轮子未在 PyPI 上发布。对于 OpenML,需要 scikit-learnpandas

转换数据集

在许多情况下,原始数据不会按原样使用。自然的方法是将 Transform 实例传递给数据集构造函数并即时修改样本。这样做会有效,但会产生额外的运行时开销。如果变换可以(至少部分)预先应用于数据集,则可以节省大量的磁盘空间和采样时产生的一些开销。为此,可以使用 preprocess() 方法。此方法将在数据集的每个样本上运行一个样本预处理管道,并用其转换版本替换现有数据集。

转换后,重新创建相同的数据集将产生另一个具有相同转换存储的对象(除非使用了 download="force"

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

离线数据集的父类。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN Experience 回放类。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的经验回放类。

GenDGRLExperienceReplay(dataset_id[, ...])

Gen-DGRL Experience Replay 数据集。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari Experience 回放数据集。

OpenMLExperienceReplay(name, batch_size[, ...])

OpenML 数据的经验回放。

OpenXExperienceReplay(dataset_id[, ...])

Open X-Embodiment 数据集经验回放。

RobosetExperienceReplay(dataset_id, ...[, ...])

Roboset 经验回放数据集。

VD4RLExperienceReplay(dataset_id, batch_size, *)

V-D4RL 经验回放数据集。

组合数据集

在离线 RL 中,同时处理多个数据集是常态。此外,TorchRL 通常具有细粒度的数据集命名法,其中每个任务单独表示,而其他库将这些数据集以更紧凑的方式表示。为了允许用户组合多个数据集,我们提出了一个 ReplayBufferEnsemble 基元,它允许用户一次从多个数据集中采样。

如果各个数据集格式不同,可以使用 Transform 实例。在以下示例中,我们创建了两个具有语义上相同但名称不同的条目的虚拟数据集(("some", "key")"another_key"),并展示了如何重命名它们以匹配名称。我们还调整了图像大小,以便在采样期间可以堆叠它们。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

经验回放缓冲区的集合。

SamplerEnsemble(*samplers[, p, ...])

采样器的集合。

StorageEnsemble(*storages[, transforms])

存储的集合。

WriterEnsemble(*writers)

写入器的集合。

TensorSpec

TensorSpec 父类及其子类定义了 TorchRL 中状态、观测值、动作、奖励和完成状态的基本属性,例如它们的形状、设备、数据类型和域。

重要的是,您的环境规范要与它发送和接收的输入和输出匹配,因为 ParallelEnv 将根据这些规范创建缓冲区以与子进程通信。请参阅 torchrl.envs.utils.check_env_specs() 方法以进行健全性检查。

如果需要,可以使用 make_composite_from_td() 函数从数据中自动生成规范。

规范分为两大类:数值和分类。

数值 TensorSpec 子类。

数值

Bounded

Unbounded

有界离散

有界连续

UnboundedDiscrete

UnboundedContinuous

每当创建 Bounded 实例时,其域(由其 dtype 隐式定义或由 “domain” 关键字参数显式定义)将决定实例化的类是 BoundedContinuous 还是 BoundedDiscrete 类型。对于 Unbounded 类也是如此。有关更多信息,请参阅这些类。

分类 TensorSpec 子类。

Categorical

OneHot

MultiOneHot

Categorical

MultiCategorical

二元

gymnasium 不同,TorchRL 没有任意规范列表的概念。如果要组合多个规范,TorchRL 假定数据将以字典形式(更具体地说,以 TensorDict 或相关格式)呈现。在这种情况下,对应的 TensorSpec 类是 Composite 规范。

尽管如此,规范可以使用 stack() 堆叠在一起:如果它们相同,它们的形状将相应扩展。否则,将通过 Stacked 类创建延迟堆栈。

同样,TensorSpecs 具有与 TensorTensorDict 一些共同的行为:它们可以像常规 Tensor 实例一样进行重塑、索引、挤压、解挤压、移动到另一个设备(to)或解绑(unbind)。

维度为 -1 的规范被认为是“动态”的,负维度表示相应数据形状不一致。当被优化器或环境(例如,批处理环境,如 ParallelEnv)看到时,这些负形状告诉 TorchRL 避免使用缓冲区,因为张量形状是不可预测的。

TensorSpec(shape, space, device, dtype, ...)

张量元数据容器的父类。

Binary([n, shape, device, dtype])

二进制离散张量规范。

Bounded(*args, **kwargs)

有界张量规范。

Categorical(n[, shape, device, dtype, mask])

离散张量规范。

Composite(*args, **kwargs)

TensorSpecs 的组合。

MultiCategorical(nvec[, shape, device, ...])

离散张量规范的连接。

MultiOneHot(nvec[, shape, device, dtype, ...])

单热编码离散张量规范的连接。

NonTensor([shape, device, dtype, ...])

非张量数据的规范。

OneHot(n[, shape, device, dtype, ...])

一维单热编码离散张量规范。

Stacked(*specs, dim)

张量规范堆栈的延迟表示。

StackedComposite(*args, **kwargs)

复合规范堆栈的延迟表示。

Unbounded(*args, **kwargs)

无界张量规范。

UnboundedContinuous(*args, **kwargs)

具有连续空间的 torchrl.data.Unbounded 的专用版本。

UnboundedDiscrete(*args, **kwargs)

具有离散空间的 torchrl.data.Unbounded 的专用版本。

以下类已弃用,仅指向上述类

BinaryDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Binary 版本。

BoundedTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Bounded 版本。

CompositeSpec(*args, **kwargs)

已弃用的 torchrl.data.Composite 版本。

DiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Categorical 版本。

LazyStackedCompositeSpec(*args, **kwargs)

已弃用的 torchrl.data.StackedComposite 版本。

LazyStackedTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Stacked 版本。

MultiDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.MultiCategorical 版本。

MultiOneHotDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.MultiOneHot 版本。

NonTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.NonTensor 版本。

OneHotDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.OneHot 版本。

UnboundedContinuousTensorSpec(*args, **kwargs)

连续空间的 torchrl.data.Unbounded 的已弃用版本。

UnboundedDiscreteTensorSpec(*args, **kwargs)

离散空间的 torchrl.data.Unbounded 的已弃用版本。

树和森林

TorchRL 提供了一组类和函数,可用于有效地表示树和森林,这对于蒙特卡洛树搜索 (MCTS) 算法尤其有用。

TensorDictMap

核心而言,MCTS API 依赖于 TensorDictMap,它充当一个存储,其索引可以是任何数值对象。在传统的存储(例如 TensorStorage)中,只允许使用整数索引。

>>> storage = TensorStorage(...)
>>> data = storage[3]

TensorDictMap 允许我们在存储中进行更高级的查询。典型的例子是当我们有一个包含一组 MDP 的存储,并且我们想通过其初始观测和动作对来重建轨迹。用张量的话来说,这可以用以下伪代码编写:

>>> next_state = storage[observation, action]

(如果此对有多个后续状态,则可以返回 next_states 的堆栈)。这个 API 是合理的,但会受到限制:允许由多个张量组成的观测或动作可能难以实现。相反,我们提供一个包含这些值的 tensordict,并让存储知道要查看哪些 in_keys 来查询下一个状态。

>>> td = TensorDict(observation=observation, action=action)
>>> next_td = storage[td]

当然,此类还允许我们使用新数据扩展存储。

>>> storage[td] = next_state

这很方便,因为它允许我们表示复杂的 rollout 结构,其中在给定节点(即给定观测)下采取不同的动作。已观察到的所有 (observation, action) 对都可能 dẫn us to a (set of) rollout that we can use further。

MCTSForest

从初始观测构建树就变成了一个高效组织数据的问题。 MCTSForest 的核心有两个存储:第一个存储将观测与过去在数据集中遇到的动作的哈希值和索引链接起来。

>>> data = TensorDict(observation=observation)
>>> metadata = forest.node_map[data]
>>> index = metadata["_index"]

其中 forest 是一个 MCTSForest 实例。然后,第二个存储会跟踪与观测相关的动作和结果。

>>> next_data = forest.data_map[index]

通常,next_data 条目可以具有任何形状,但它通常会匹配 index 的形状(因为每个索引对应一个动作)。一旦获得 next_data,就可以将其与 data 组合以形成一组节点,并且可以为每个节点扩展树。下图展示了这是如何完成的。

../_images/collector-copy.png

MCTSForest 对象构建 Tree。流程图表示一棵树如何从初始观测 o 构建。 get_tree 方法将输入数据结构(根节点)传递给 node_map TensorDictMap 实例,该实例返回一组哈希值和索引。然后,使用这些索引来查询与根节点关联的相应的动作、下一个观测、奖励等元组。从每个元组创建一个顶点(如果需要紧凑表示,则可能带有更长的 rollout)。然后,顶点堆栈用于进一步构建树,这些顶点堆叠在一起构成了根处的树分支。此过程会重复进行,直到达到给定的深度或树无法再扩展为止。

BinaryToDecimal(num_bits, device, dtype[, ...])

一个将二进制编码的张量转换为十进制的模块。

HashToInt()

将哈希值转换为可用于索引连续存储的整数。

MCTSForest(*[, data_map, node_map, ...])

MCTS 树的集合。

QueryModule(*args, **kwargs)

一个用于为存储生成兼容索引的模块。

RandomProjectionHash(*[, n_components, ...])

一个结合了随机投影和 SipHash 的模块,以获得低维张量,便于通过 SipHash 进行嵌入。

SipHash([as_tensor])

一个用于计算给定张量的 SipHash 值的模块。

TensorDictMap(*args, **kwargs)

TensorDict 的 Map 存储。

TensorMap()

实现不同存储的抽象。

Tree([count, wins, index, hash, node_id, ...])

大型语言模型和基于人类反馈的强化学习 (RLHF)

警告

这些 API 已弃用,将在未来移除。请改用 torchrl.data.llm 模块。有关更多信息,请参阅完整的 LLM 文档

数据在 LLM 后期训练(例如 GRPO 或基于人类反馈的强化学习 (RLHF))中至关重要。鉴于这些技术通常用于语言领域,而该领域在库的其他 RL 子领域中很少涉及,因此我们提供专门的工具来促进与 datasets 等外部库的交互。这些工具包括用于标记化数据、将其格式化为适合 TorchRL 模块的格式以及优化存储以实现高效采样。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

Prompt 数据集的标记化配方。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

用于执行因果语言模型 rollout 的类。

TensorDictTokenizer(tokenizer, max_length[, ...])

用于对文本示例应用标记器的进程函数工厂。

TokenizedDatasetLoader(split, max_length, ...)

加载标记化的数据集,并缓存其内存映射副本。

create_infinite_iterator(iterator)

无限迭代器。

get_dataloader(batch_size, block_size, ...)

创建数据集并从中返回 dataloader。

ConstantKLController(*[, kl_coef, model])

恒定 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

自 Ziegler 等人 "Fine-Tuning Language Models from Human Preferences" 论文所述的自适应 KL 控制器。

Utils

DensifyReward(*args, **kwargs)

一个用于将 done 状态下的奖励重新分配给轨迹其余部分的实用程序。

Flat2TED([done_key, shift_key, is_full_key, ...])

一个存储加载钩子,用于将扁平化的 TED 数据反序列化为 TED 格式。

H5Combine()

将持久性 tensordict 中的轨迹合并为存储在文件系统中的单个 standing tensordict。

H5Split([done_key, shift_key, is_full_key, ...])

将使用 TED2Nested 准备的数据集分割为 TensorDict,其中每个轨迹都存储为对其父嵌套张量的视图。

MultiStep(gamma, n_steps)

多步奖励转换。

Nested2TED([done_key, shift_key, ...])

将嵌套的 tensordict(其中每一行是一个轨迹)转换为 TED 格式。

TED2Flat([done_key, shift_key, is_full_key, ...])

一个存储保存钩子,用于将 TED 数据序列化为紧凑格式。

TED2Nested(*args, **kwargs)

将 TED 格式的数据集转换为填充了嵌套张量的 tensordict,其中每一行是一个轨迹。

check_no_exclusive_keys(spec[, recurse])

给定一个 TensorSpec,如果不存在独占键,则返回 true。

consolidate_spec(spec[, ...])

给定一个 TensorSpec,通过添加 0 形状的 spec 来删除独占键。

contains_lazy_spec(spec)

如果 spec 包含延迟堆叠的 spec,则返回 true。

MultiStepTransform(n_steps, gamma, *[, ...])

ReplayBuffers 的多步转换。

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源