• 文档 >
  • 从数据流构建 tensordict
快捷方式

使用流构建 TensorDict

作者Vincent Moens

在许多实际应用中,数据是连续生成的,并且频率各不相同。

例如,来自物联网设备、金融交易或社交媒体更新的传感器读数都可以产生需要实时处理和分析的数据流。

在处理此类数据流时,通常需要将传入数据“分桶”成离散的块,以便进行高效的处理和分析。然而,当处理具有不同频率或格式的数据流时,这可能会很困难。

在本教程中,我们将探讨如何使用 TensorDict 构建和操作数据流。我们将学习如何创建张量的懒惰堆栈、处理异步数据流,以及对数据进行密集化以实现高效存储和处理。

在本教程中,您将学习: - 如何读取数据流并在 tensordict 中以设定的间隔写入; - 如何构建可以堆叠具有异构形状的 TensorDict; - 如果需要,如何使用 nested_tensor 将这些张量密集化到单个存储中。

堆叠异构的 tensordicts

在许多实际场景中,数据以具有不同设定频率的流的形式到来。

本教程的目标是“分桶”即将到来的数据,以便以给定的较慢频率读取和处理。此场景中的挑战在于,数据可能无法表示为常规的“矩形”格式(即,张量的每个维度都已定义),但一种可能的情况是,一个数据块比另一个数据块包含更多元素,在这种情况下我们无法简单地将它们堆叠在一起。通常,考虑前两个数据块如下

import torch
from tensordict import TensorDict

bucket0 = TensorDict(stream0=torch.randn(5), stream1=torch.randn(4))
bucket1 = TensorDict(stream0=torch.randn(4), stream1=torch.randn(5))

原则上,我们无法在内存中连续堆叠这两个 tensordict,因为两个流的形状不同。幸运的是,TensorDict 提供了一个工具,可以将具有异构张量形状的实例分组在一起:LazyStackedTensorDict。要创建懒惰堆栈,只需调用 lazy_stack()

data = TensorDict.lazy_stack([bucket0, bucket1], dim=0)
print(data)
LazyStackedTensorDict(
    fields={
        stream0: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False),
        stream1: Tensor(shape=torch.Size([2, -1]), device=cpu, dtype=torch.float32, is_shared=False)},
    exclusive_fields={
    },
    batch_size=torch.Size([2]),
    device=None,
    is_shared=False,
    stack_dim=0)

生成的数据只是两个 tensordicts 的表示,就像它们已经沿维度 0 堆叠在一起一样。LazyStackedTensorDict 支持 TensorDictBase 类的大多数常用操作,以下是一些示例

data_select = data.select("stream0")
data_plus_1 = data + 1
data_apply = data.apply(lambda x: x + 1)

此外,对其进行索引将返回用于创建堆栈的原始数据

assert data[0] is bucket0

尽管如此,在某些情况下,人们可能希望对底层数据进行连续表示。为此,TensorDictBase 提供了一个 densify() 方法,该方法将堆叠可堆叠的张量,并尝试将其余部分表示为 nested_tensor 实例

data_cont = data.densify()

异步数据流

现在让我们转向一个更具体的例子,其中我们创建一个函数,该函数以设定的频率流式传输数据(在本例中,只是每个迭代增加 1 的整数)。

为了在线程之间传递数据,该函数将使用接收到的队列作为输入

import asyncio
from typing import List


async def generate_numbers(frequency: float, queue: asyncio.Queue) -> None:
    i = 0
    while True:
        await asyncio.sleep(1 / frequency)
        await queue.put(i)
        i += 1

函数 collect_data 会在给定时间内从队列读取数据。一旦 timeout 时间已过,函数就会返回

async def collect_data(queue: asyncio.Queue, timeout: float) -> List[int]:
    values = []

    # We create a nested `collect` async function in order to be able to stop it as
    #  soon as timeout is passed (see wait_for below).
    async def collect():
        nonlocal values
        while True:
            value = await queue.get()
            values.append(value)

    task = asyncio.create_task(collect())
    try:
        await asyncio.wait_for(task, timeout=timeout)
    except asyncio.TimeoutError:
        task.cancel()
    return values

函数 wait7hz 会在给定时间内从队列读取数据。

async def wait7hz() -> None:
    queue = asyncio.Queue()
    generate_task = asyncio.create_task(generate_numbers(7, queue))
    collect_data_task = asyncio.create_task(collect_data(queue, timeout=1))
    values = await collect_data_task
    # The ``generate_task`` has not been terminated
    generate_task.cancel()
    print(values)


asyncio.run(wait7hz())

from typing import Callable, Dict
[0, 1, 2, 3, 4, 5]

我们现在可以设计一个继承自 LazyStackedTensorDict 的类,该类从不同的流读取数据,并将它们记录在单独的 tensordicts 中。 LazyStackedTensorDict 的一个优点是它也可以增量构建,因此我们可以通过扩展懒惰堆栈来简单地记录新传入的数据,直到我们收集了足够的数据。这是此 StreamedTensorDict 类的实现

from tensordict import LazyStackedTensorDict, NestedKey, TensorDictBase


class StreamedTensorDict(LazyStackedTensorDict):
    """A lazy stack class that can be built from a dictionary of streams."""

    @classmethod
    async def from_streams(
        cls,
        streams: Dict[NestedKey, Callable],
        timeout: float,
        batch_size: int,
        densify: bool = True,
    ) -> TensorDictBase:
        td = cls(stack_dim=0)

        # We construct a queue for each stream
        queues = [asyncio.Queue() for _ in range(len(streams))]
        tasks = []
        for stream, queue in zip(streams.values(), queues):
            task = asyncio.create_task(stream(queue))
            tasks.append(task)
        for _ in range(batch_size):
            values_tasks = []
            for queue in queues:
                values_task = asyncio.create_task(collect_data(queue, timeout))
                values_tasks.append(values_task)
            values = await asyncio.gather(*values_tasks)
            td.append(TensorDict(dict(zip(streams.keys(), values))))

        # Cancel the generator tasks
        for task in tasks:
            task.cancel()
        if densify:
            return td.densify(layout=torch.strided)
        return td

最后,main 函数将组合流函数 stream0stream1,并将它们传递给 StreamedTensorDict.from_streams 方法,该方法将收集 batch_size 个批次的数据,每个批次持续 timeout=1

async def main() -> TensorDictBase:
    def stream0(queue):
        return generate_numbers(frequency=7, queue=queue)

    def stream1(queue):
        return generate_numbers(frequency=3, queue=queue)

    # Running this should take about 10 seconds
    return await StreamedTensorDict.from_streams(
        {"bucket0": stream0, "bucket1": stream1}, timeout=1, batch_size=10
    )


td = asyncio.run(main())

print("TensorDict from stream", td)
TensorDict from stream TensorDict(
    fields={
        bucket0: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False),
        bucket1: Tensor(shape=torch.Size([10, -1]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)
让我们表示来自两个流的数据——应该等于 torch.arange(),对于 batch_size * timeout * Hz

<=> 1 * 10 秒 * 3 或 7

print("bucket0 (7Hz, around 70 values)", td["bucket0"].values())
print("bucket1 (3Hz, around 30 values)", td["bucket1"].values())
print("shapes of bucket0 (7Hz, around 70 values)", td["bucket0"]._nested_tensor_size())
bucket0 (7Hz, around 70 values) tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
        36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
        54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68])
bucket1 (3Hz, around 30 values) tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28])
shapes of bucket0 (7Hz, around 70 values) tensor([[6],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7],
        [7]])

结论

在本教程中,我们探讨了使用 TensorDict 和异步数据流的基础知识。我们学习了如何创建张量的懒惰堆栈、使用 asyncio 处理异步数据流,以及如何对数据进行密集化以实现高效存储和处理。

我们还看到了 TensorDictLazyStackedTensorDict 如何用于简化复杂的数据处理任务,例如对具有不同频率的数据流进行分桶。通过利用 TensorDict 和 asyncio 的强大功能,您可以构建可扩展且高效的数据处理管道,即使是最具挑战性的实际应用也能胜任。

感谢您跟随本教程!希望您觉得它有所帮助且信息丰富。

脚本总运行时间: (0 分钟 11.024 秒)

由 Sphinx-Gallery 生成的画廊

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源