快捷方式

SSD 嵌入式运算符

CUDA 运算符

enum RocksdbWriteMode

rocksdb 写入模式

在 SSD 卸载中,每次训练迭代有 3 次写入 FWD_ROCKSDB_READ:缓存查找会将未缓存的数据从 rocksdb 移动到 fwd 路径的 L2 缓存中

FWD_L1_EVICTION:L1 缓存驱逐会将数据驱逐到 fwd 路径的 L2 缓存中

BWD_L1_CNFLCT_MISS_WRITE_BACK:L1 冲突未命中会将数据插入 L2 以在 bwd 路径上更新嵌入

上述所有 L2 缓存填充都可能在 L2 缓存已满时触发 rocksdb 写入

STREAM:原始嵌入流式传输请求的占位符,它不直接与 L2 和 rocksDB 交互

此外,我们将在 L2 刷新时执行 ssd io

enumerator FWD_ROCKSDB_READ
enumerator FWD_L1_EVICTION
enumerator BWD_L1_CNFLCT_MISS_WRITE_BACK
enumerator FLUSH
enumerator STREAM
inline size_t hash_shard(int64_t id, size_t num_shards)

用于 SSD L2 缓存和 rocksdb 分片算法的哈希函数

参数:
  • id – 分片键

  • num_shards – 分片范围

返回:

分片 ID 的范围是 [0, num_shards)

std::tuple<at::Tensor, at::Tensor> get_bucket_sorted_indices_and_bucket_tensor(const at::Tensor &unordered_indices, int64_t hash_mode, int64_t bucket_start, int64_t bucket_end, std::optional<int64_t> bucket_size, std::optional<int64_t> total_num_buckets)

给定一个包含乱序 ID 的张量,返回 2 个张量 tensor1 包含按存储桶升序排序的 ID,例如给定 [1,2,3,4] 和 2 个存储桶 [1, 4) 和 [4, 7),输出将是 [1,2,3,4] 或 [2, 1, 3, 4],ID 1、2、3 必须在 4 之前,但 1 2 3 可以是任何顺序 tensor2 包含每个存储桶中的嵌入数量(ID(张量偏移量)),在上面的示例中,tensor2 将是 [3, 1],其中第一项对应于第一个存储桶 ID,值为 3 表示第一个存储桶 ID 中有 3 个 ID

参数:
  • unordered_indices – 乱序 ID,此处的 ID 可能是原始(未线性化)ID

  • hash_mode – 0 用于基于块的哈希,1 用于基于交错的哈希

  • bucket_start – 全局存储桶 ID,存储桶范围的开始

  • bucket_end – 全局存储桶 ID,存储桶范围的结束

  • bucket_size – 可选,存储桶的虚拟大小(输入空间,例如 2^50)

  • total_num_buckets – 可选,每个训练模型的总存储桶数量

返回:

包含 2 个张量的列表,第一个张量是按存储桶排序的 ID,第二个张量是存储桶大小

inline std::string get_rocksdb_path(const std::string &base_path, int db_shard_id, const std::string &tbe_uuid, bool default_path)

默认方式,根据用户提供的 base_path 生成 rocksdb 路径。文件层级将是 <base_path><ssd_idx>/<tbe_uuid>,用于默认 SSD 挂载 <base_path>/<tbe_uuid>,用于用户提供的 base path。

参数:
  • base_path – 所有与一个 TBE/EmbeddingRocksDB 相关联的 rocksdb 分片的基路径

  • db_shard_id – rocksdb 分片索引,用于确定使用哪个 SSD

  • tbe_uuid – 训练作业生命周期内每个 TBE 的唯一标识符

  • default_path – base_path 是默认 SSD 挂载还是用户提供的

返回:

到该 rocksdb 分片的基路径

inline std::string get_rocksdb_shard_path(int db_shard_id, const std::string &rocksdb_path)

生成 rocksdb 分片路径,基于 rocksdb_path,文件层级将是 <rocksdb_shard_path>/shard_<db_shard>

参数:
  • db_shard_id – rocksdb 分片索引

  • rocksdb_path – rocksdb 分片的基路径

返回:

rocksdb 分片路径

inline std::string get_rocksdb_checkpoint_dir(int db_shard_id, const std::string &rocksdb_path)

为特定 rocksdb 分片路径生成一个用于保存 rocksdb 检查点的目录。文件层级将是 <rocksdb_shard_path>/checkpoint_shard_<db_shard>

参数:
  • db_shard_id – rocksdb 分片索引

  • rocksdb_path – rocksdb 分片的基路径

返回:

保存一个 rocksdb 分片检查点的目录

void cuda_callback_func(cudaStream_t stream, cudaError_t status, void *functor)

cudaStreamAddCallback 的回调函数

cudaStreamAddCallback 的通用回调函数,即 `cudaStreamCallback_t callback`。此函数将 `functor` 强制转换为 void 函数,调用它,然后删除它(删除发生在另一个线程中)

参数:
  • stream – `cudaStreamAddCallback` 操作的 CUDA 流

  • status – CUDA 状态

  • functor – 将被调用的函子

返回:

Tensor masked_index_put_cuda(Tensor self, Tensor indices, Tensor values, Tensor count, const bool use_pipeline, const int64_t preferred_sms)

类似于 `torch.Tensor.index_put`,但忽略 `indices < 0`

``masked_index_put_cuda`` 只支持 2D 输入 `values`。它使用 `indices` 中大于等于 0 的行索引,将 `values` 中的 `count` 行放入 `self` 中。

# Equivalent PyTorch Python code
indices = indices[:count]
filter_ = indices >= 0
indices_ = indices[filter_]
self[indices_] = values[filter_.nonzero().flatten()]
参数:
  • self – 2D 输出张量(被索引的张量)

  • indices – 1D 索引张量

  • values – 2D 输入张量

  • count – 包含要处理的 `indices` 长度的张量

  • use_pipeline – 指示此内核将与其他内核重叠的标志。如果为 true,则使用一部分 SM 来减少资源竞争

  • preferred_sms – 当 use_pipeline=true 时,内核使用的首选 SM 数量。当 use_pipeline=false 时,此值将被忽略。

返回:

self 张量

Tensor masked_index_select_cuda(Tensor self, Tensor indices, Tensor values, Tensor count, const bool use_pipeline, const int64_t preferred_sms)

类似于 `torch.index_select`,但忽略 `indices < 0`

``masked_index_select_cuda`` 只支持 2D 输入 `values`。它从 `values` 中,使用 `indices` 中(`indices` >= 0)指定的 `count` 行,将它们放入 `self` 中。

# Equivalent PyTorch Python code
indices = indices[:count]
filter_ = indices >= 0
indices_ = indices[filter_]
self[filter_.nonzero().flatten()] = values[indices_]
参数:
  • self – 2D 输出张量

  • indices – 1D 索引张量

  • values – 2D 输入张量(被索引的张量)

  • count – 包含要处理的 `indices` 长度的张量

  • use_pipeline – 指示此内核将与其他内核重叠的标志。如果为 true,则使用一部分 SM 来减少资源竞争

  • preferred_sms – 当 use_pipeline=true 时,内核使用的首选 SM 数量。当 use_pipeline=false 时,此值将被忽略。

返回:

self 张量

std::tuple<Tensor, Tensor> ssd_generate_row_addrs_cuda(const Tensor &lxu_cache_locations, const Tensor &assigned_cache_slots, const Tensor &linear_index_inverse_indices, const Tensor &unique_indices_count_cumsum, const Tensor &cache_set_inverse_indices, const Tensor &lxu_cache_weights, const Tensor &inserted_ssd_weights, const Tensor &unique_indices_length, const Tensor &cache_set_sorted_unique_indices)

生成 SSD TBE 数据的内存地址。

从 SSD 检索的数据可以存储在临时缓冲区(HBM)或 LXU 缓存(也是 HBM)中。``lxu_cache_locations`` 用于指定数据的位置。如果位置为 -1,则关联索引的数据在临时缓冲区中;否则,它在缓存中。为了使 TBE 内核能够方便地访问数据,此运算符会为每个索引生成第一个字节的内存地址。访问数据时,TBE 内核只需将地址转换为指针。

此外,此运算符还生成了后向驱逐索引列表,这些索引基本上是其数据位于临时缓冲区中的索引。

参数:
  • lxu_cache_locations – 包含数据存储在 *完整* 索引列表的缓存槽的张量。-1 是一个哨兵值,表示数据不在缓存中。

  • assigned_cache_slots – 包含 *唯一* 索引列表的缓存槽的张量。-1 表示数据不在缓存中

  • linear_index_inverse_indices – 在排序之前包含线性索引原始位置的张量

  • unique_indices_count_cumsum – 包含唯一索引计数的前缀和(不含)结果的张量

  • cache_set_inverse_indices – 在排序之前包含缓存集原始位置的张量

  • lxu_cache_weights – LXU 缓存张量

  • inserted_ssd_weights – 临时缓冲区张量

  • unique_indices_length – 包含唯一索引数量的张量(GPU 张量)

  • cache_set_sorted_unique_indices – 包含排序后的唯一缓存集关联的唯一索引的张量

返回:

张量元组(SSD 行地址张量和后向驱逐索引张量)

void ssd_update_row_addrs_cuda(const Tensor &ssd_row_addrs_curr, const Tensor &inserted_ssd_weights_curr_next_map, const Tensor &lxu_cache_locations_curr, const Tensor &linear_index_inverse_indices_curr, const Tensor &unique_indices_count_cumsum_curr, const Tensor &cache_set_inverse_indices_curr, const Tensor &lxu_cache_weights, const Tensor &inserted_ssd_weights_next, const Tensor &unique_indices_length_curr)

更新 SSD TBE 数据的内存地址。

当启用流水线预取时,当前迭代中临时缓冲区中的数据可以在预取步骤中移动到下一迭代的 L1 或临时缓冲区。此运算符更新重新定位到正确位置的数据的内存地址。

参数:
  • ssd_row_addrs_curr – 包含当前迭代行地址的张量

  • inserted_ssd_weights_curr_next_map – 包含当前迭代中临时缓冲区中每个索引位置映射到下一迭代临时缓冲区中位置的张量。(-1 = 数据尚未移动)。inserted_ssd_weights_curr_next_map[i] 是位置

  • lxu_cache_locations_curr – 包含当前迭代 *完整* 索引列表数据存储的缓存槽的张量。-1 是一个哨兵值,表示数据不在缓存中。

  • linear_index_inverse_indices_curr – 包含当前迭代排序前线性索引原始位置的张量

  • unique_indices_count_cumsum_curr – 包含当前迭代唯一索引计数的(不含)前缀和结果的张量

  • cache_set_inverse_indices_curr – 包含当前迭代排序前缓存集原始位置的张量

  • lxu_cache_weights – LXU 缓存张量

  • inserted_ssd_weights_next – 下一迭代的临时缓冲区张量

  • unique_indices_length_curr – 包含当前迭代唯一索引数量的张量(GPU 张量)

返回:

void compact_indices_cuda(std::vector<Tensor> compact_indices, Tensor compact_count, std::vector<Tensor> indices, Tensor masks, Tensor count)

压缩给定的索引列表。

此运算符根据给定的掩码(包含 0 或 1 的张量)压缩给定的索引列表。运算符会删除对应掩码为 0 的索引。它仅处理 `count` 个元素(而不是整个张量)。

示例

indices = [[0, 3, -1, 3, -1, -1, 7], [0, 2, 2, 3, -1, 9, 7]]
masks = [1, 1, 0, 1, 0, 0, 1]
count = 5

# x represents an arbitrary value
compact_indices = [[0, 3, 3, x, x, x, x], [0, 2, 3, x, x, x, x]]
compact_count = 3
参数:
  • compact_indices – 压缩索引列表(输出索引)。

  • compact_count – 包含压缩后元素数量的张量

  • indices – 要压缩的输入索引列表

  • masks – 包含 0 或 1 的张量,用于指示是否删除/保留元素。0 = 删除相应索引。1 = 保留相应索引。@count count 包含要压缩的元素数量的张量

class CacheLibCache
#include <cachelib_cache.h>

用于 Cachelib 交互的 Cachelib 包装器类。

它用于维护所有缓存相关的操作,包括初始化、插入、查找和驱逐。它对于驱逐逻辑是有状态的,调用者必须专门获取并重置驱逐相关的状态。Cachelib 相关的优化将在此类中捕获,例如 fetch 和 delayed markUseful 以提高 get 性能

注意

这个类只处理单个 Cachelib 读取/更新。并行性由调用者处理

class EmbeddingParameterServer : public EmbeddingKVDB
#include <ps_table_batched_embeddings.h>

Training Parameter Service (TPS) 客户端的 EmbeddingKVDB 实现。

class CacheContext
#include <kv_db_table_batched_embeddings.h>

它保存 l2cache 查找结果。

num_misses 是 l2 缓存查找的未命中次数,cached_addr_list 预先分配了查找次数,以提高并行性,无效位置(缓存未命中)将保持为哨兵值

struct QueueItem
#include <kv_db_table_batched_embeddings.h>

用于后台 L2/rocksdb 更新的队列项

indices/weights/count 是相应的 set() 参数

read_handles 是 cachelib 抽象的索引/嵌入对元数据,稍后将用于更新 cachelib LRU 队列,因为我们将其与 EmbeddingKVDB::get_cache() 分开

mode 用于监控 rocksdb 写入,请参阅 RocksdbWriteMode 以获取详细说明

class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB>
#include <kv_db_table_batched_embeddings.h>

一个用于与不同缓存层和存储层交互的类,公共调用在 cuda 流上执行。

目前,TBE 使用它将 Key(嵌入索引)Value(嵌入)卸载到 DRAM、SSD 或远程存储,以提供更好的可伸缩性,而不会耗尽 HBM 资源

子类包括 DramKVEmbeddingCache< weight_type >、EmbeddingParameterServerEmbeddingRocksDB

class EmbeddingRocksDB : public EmbeddingKVDB
#include <ssd_table_batched_embeddings.h>

RocksDB 的 EmbeddingKVDB 实现。

子类包括 MockEmbeddingRocksDB

class ReadOnlyEmbeddingKVDB : public CustomClassHolder
#include <ssd_table_batched_embeddings.h>

RocksDB 的 ReadOnlyEmbeddingKVDB 实现。

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源