SSD Embedding 运算符¶
CUDA 运算符¶
-
enum RocksdbWriteMode¶
rocksdb 写入模式
在 SSD 卸载中,每个训练迭代中有 3 次写入 FWD_ROCKSDB_READ:缓存查找会在前向传播路径上将未缓存的数据从 rocksdb 移入 L2 缓存
FWD_L1_EVICTION:L1 缓存逐出会将数据逐出到 L2 缓存(在前向传播路径上)
BWD_L1_CNFLCT_MISS_WRITE_BACK:L1 冲突未命中会在后向传播路径上插入到 L2 中以进行嵌入更新
一旦 L2 缓存已满,上述所有 L2 缓存填充都可能触发 rocksdb 写入
STREAM:原始嵌入流请求的占位符,它不直接与 L2 和 rocksDB 交互
此外,我们将在 L2 刷新时执行 ssd io
值
-
enumerator FWD_ROCKSDB_READ¶
-
enumerator FWD_L1_EVICTION¶
-
enumerator BWD_L1_CNFLCT_MISS_WRITE_BACK¶
-
enumerator FLUSH¶
-
enumerator STREAM¶
-
enumerator FWD_ROCKSDB_READ¶
-
inline size_t hash_shard(int64_t id, size_t num_shards)¶
用于 SSD L2 缓存和 rocksdb 分片算法的哈希函数
- 参数:
id – 分片键
num_shards – 分片范围
- 返回:
分片 ID 范围为 [0, num_shards)
-
std::tuple<at::Tensor, at::Tensor> get_bucket_sorted_indices_and_bucket_tensor(const at::Tensor &unordered_indices, int64_t hash_mode, int64_t bucket_start, int64_t bucket_end, std::optional<int64_t> bucket_size, std::optional<int64_t> total_num_buckets)¶
给定一个包含随机顺序 ID 的张量,返回 2 个张量。张量 1 包含按桶升序排序的 ID,例如,给定 [1,2,3,4] 和 2 个桶 [1, 4) 和 [4, 7),输出将是 [1,2,3,4] 或 [2, 1, 3, 4],ID 1, 2, 3 必须在 4 之前,但 1, 2, 3 可以是任何顺序。张量 2 包含每个桶中嵌入的数量(张量偏移量),在上面的例子中,张量 2 将是 [3, 1],其中第一项对应于第一个桶 ID,值 3 表示第一个桶中有 3 个 ID。
- 参数:
unordered_indices – 无序 ID,这里的 ID 可能是原始(非线性化)ID
hash_mode – 0 表示基于块的哈希,1 表示基于交错的哈希
bucket_start – 全局桶 ID,桶范围的开始
bucket_end – 全局桶 ID,桶范围的结束
bucket_size – 一个可选的、桶的虚拟大小(输入空间,例如 2^50)
total_num_buckets – 一个可选参数,表示每个训练模型的总桶数
- 返回:
2 个张量的列表,第一个张量是按桶排序的 ID,第二个张量是桶的大小
-
inline std::string get_rocksdb_path(const std::string &base_path, int db_shard_id, const std::string &tbe_uuid, bool default_path)¶
根据用户提供的 base_path 生成 rocksdb 路径的默认方式,文件层次结构将是 <base_path><ssd_idx>/<tbe_uuid>(对于默认 SSD 挂载)或 <base_path>/<tbe_uuid>(对于用户提供的基础路径)
- 参数:
base_path – 绑定到一个 TBE/EmbeddingRocksDB 的所有 rocksdb 分片的基础路径
db_shard_id – rocksdb 分片索引,用于确定使用哪个 SSD
tbe_uuid – 训练作业生命周期内每个 TBE 的唯一标识符
default_path – base_path 是默认 SSD 挂载还是用户提供
- 返回:
该 rocksdb 分片的基础路径
-
inline std::string get_rocksdb_shard_path(int db_shard_id, const std::string &rocksdb_path)¶
根据 rocksdb_path 生成 rocksdb 分片路径,文件层次结构将是 <rocksdb_shard_path>/shard_<db_shard>
- 参数:
db_shard_id – rocksdb 分片索引
rocksdb_path – rocksdb 分片的基础路径
- 返回:
rocksdb 分片路径
-
inline std::string get_rocksdb_checkpoint_dir(int db_shard_id, const std::string &rocksdb_path)¶
为特定 rocksdb 分片路径生成一个用于存放 rocksdb 检查点的目录,文件层次结构将是 <rocksdb_shard_path>/checkpoint_shard_<db_shard>
- 参数:
db_shard_id – rocksdb 分片索引
rocksdb_path – rocksdb 分片的基础路径
- 返回:
存放一个 rocksdb 分片的 rocksdb 检查点的目录
-
void cuda_callback_func(cudaStream_t stream, cudaError_t status, void *functor)¶
cudaStreamAddCallback
的回调函数cudaStreamAddCallback
的通用回调函数,即cudaStreamCallback_t callback
。此函数将functor
转换为 void 函数,调用它然后删除它(删除操作在另一个线程中发生)- 参数:
stream –
cudaStreamAddCallback
操作的 CUDA 流status – CUDA 状态
functor – 将被调用的仿函数
- 返回:
无
-
Tensor masked_index_put_cuda(Tensor self, Tensor indices, Tensor values, Tensor count, const bool use_pipeline, const int64_t preferred_sms)¶
类似于
torch.Tensor.index_put
,但忽略indices < 0
masked_index_put_cuda
仅支持 2D 输入values
。它将values
中的count
行放入self
中,使用indices
中 >= 0 的行索引。# Equivalent PyTorch Python code indices = indices[:count] filter_ = indices >= 0 indices_ = indices[filter_] self[indices_] = values[filter_.nonzero().flatten()]
- 参数:
self – 2D 输出张量(被索引的张量)
indices – 1D 索引张量
values – 2D 输入张量
count – 包含要处理的
indices
长度的张量use_pipeline – 一个标志,指示此内核将与其他内核重叠。如果为 true,则使用一部分 SM 来减少资源竞争
preferred_sms – 当 use_pipeline=true 时,内核首选使用的 SM 数量。当 use_pipeline=false 时,此值被忽略。
- 返回:
self
张量
-
Tensor masked_index_select_cuda(Tensor self, Tensor indices, Tensor values, Tensor count, const bool use_pipeline, const int64_t preferred_sms)¶
类似于
torch.index_select
,但忽略indices < 0
masked_index_select_cuda
仅支持 2D 输入values
。它将values
中由indices
指定(其中indices
>= 0)的count
行放入self
中。# Equivalent PyTorch Python code indices = indices[:count] filter_ = indices >= 0 indices_ = indices[filter_] self[filter_.nonzero().flatten()] = values[indices_]
- 参数:
self – 2D 输出张量
indices – 1D 索引张量
values – 2D 输入张量(被索引的张量)
count – 包含要处理的
indices
长度的张量use_pipeline – 一个标志,指示此内核将与其他内核重叠。如果为 true,则使用一部分 SM 来减少资源竞争
preferred_sms – 当 use_pipeline=true 时,内核首选使用的 SM 数量。当 use_pipeline=false 时,此值被忽略。
- 返回:
self
张量
-
std::tuple<Tensor, Tensor> ssd_generate_row_addrs_cuda(const Tensor &lxu_cache_locations, const Tensor &assigned_cache_slots, const Tensor &linear_index_inverse_indices, const Tensor &unique_indices_count_cumsum, const Tensor &cache_set_inverse_indices, const Tensor &lxu_cache_weights, const Tensor &inserted_ssd_weights, const Tensor &unique_indices_length, const Tensor &cache_set_sorted_unique_indices)¶
为 SSD TBE 数据生成内存地址。
从 SSD 检索的数据可以存储在便笺簿(HBM)或 LXU 缓存(也是 HBM)中。
lxu_cache_locations
用于指定数据的位置。如果位置为 -1,则相关索引的数据在便笺簿中;否则,它在缓存中。为了使 TBE 内核能够方便地访问数据,此运算符为每个索引生成第一个字节的内存地址。访问数据时,TBE 内核只需将地址转换为指针。此外,此运算符还生成后向传播后被逐出的索引列表,这些索引基本上是数据在便笺簿中的索引。
- 参数:
lxu_cache_locations – 该张量包含完整索引列表中数据存储的缓存槽。-1 是一个哨兵值,表示数据不在缓存中。
assigned_cache_slots – 该张量包含唯一索引列表的缓存槽。-1 表示数据不在缓存中。
linear_index_inverse_indices – 该张量包含线性索引在排序前的原始位置。
unique_indices_count_cumsum – 该张量包含唯一索引计数的独占前缀和结果。
cache_set_inverse_indices – 该张量包含缓存集在排序前的原始位置。
lxu_cache_weights – LXU 缓存张量。
inserted_ssd_weights – 便笺簿张量。
unique_indices_length – 该张量包含唯一索引的数量(GPU 张量)。
cache_set_sorted_unique_indices – 该张量包含与已排序的唯一缓存集相关联的唯一索引。
- 返回:
一个张量元组(SSD 行地址张量和后向传播后被逐出的索引张量)
-
void ssd_update_row_addrs_cuda(const Tensor &ssd_row_addrs_curr, const Tensor &inserted_ssd_weights_curr_next_map, const Tensor &lxu_cache_locations_curr, const Tensor &linear_index_inverse_indices_curr, const Tensor &unique_indices_count_cumsum_curr, const Tensor &cache_set_inverse_indices_curr, const Tensor &lxu_cache_weights, const Tensor &inserted_ssd_weights_next, const Tensor &unique_indices_length_curr)¶
更新 SSD TBE 数据的内存地址。
当启用流水线预取时,当前迭代的便笺簿中的数据可以在预取步骤中移动到 L1 或下一迭代的便笺簿。此运算符更新被重定位到正确位置的数据的内存地址。
- 参数:
ssd_row_addrs_curr – 包含当前迭代行地址的张量
inserted_ssd_weights_curr_next_map – 包含每个索引在下一迭代便笺簿中位置映射的张量。(-1 = 数据未移动)。inserted_ssd_weights_curr_next_map[i] 是位置
lxu_cache_locations_curr – 包含当前迭代完整索引列表数据存储的缓存槽的张量。-1 是一个哨兵值,表示数据不在缓存中。
linear_index_inverse_indices_curr – 包含当前迭代线性索引在排序前的原始位置的张量
unique_indices_count_cumsum_curr – 包含当前迭代唯一索引计数的独占前缀和结果的张量
cache_set_inverse_indices_curr – 包含当前迭代缓存集在排序前的原始位置的张量
lxu_cache_weights – LXU 缓存张量。
inserted_ssd_weights_next – 下一迭代的便笺簿张量
unique_indices_length_curr – 包含当前迭代唯一索引数量的张量(GPU 张量)
- 返回:
无
-
void compact_indices_cuda(std::vector<Tensor> compact_indices, Tensor compact_count, std::vector<Tensor> indices, Tensor masks, Tensor count)¶
压缩给定的索引列表。
此运算符根据给定的掩码(一个包含 0 或 1 的张量)压缩给定的索引列表。该运算符移除其对应掩码为 0 的索引。它仅对
count
个元素进行操作(而不是整个张量)。示例
indices = [[0, 3, -1, 3, -1, -1, 7], [0, 2, 2, 3, -1, 9, 7]] masks = [1, 1, 0, 1, 0, 0, 1] count = 5 # x represents an arbitrary value compact_indices = [[0, 3, 3, x, x, x, x], [0, 2, 3, x, x, x, x]] compact_count = 3
- 参数:
compact_indices – 压缩索引列表(输出索引)。
compact_count – 包含压缩后元素数量的张量
indices – 待压缩的输入索引列表
masks – 一个包含 0 或 1 的张量,用于指示是移除还是保留元素。0 = 移除相应的索引。1 = 保留相应的索引。@count count 一个包含待压缩元素数量的张量。
-
class CacheLibCache¶
- #include <cachelib_cache.h>
一个用于与 Cachlib 交互的 Cachlib 包装类。
它用于维护所有与缓存相关的操作,包括初始化、插入、查找和逐出。它对于逐出逻辑是有状态的,调用者必须专门获取和重置与逐出相关的状态。Cachlib 相关的优化将在此类中捕获,例如获取和延迟标记为有用以提升 get 性能。
注意
该类只处理单个 Cachelib 读/写。并行性由调用方实现。
-
class EmbeddingParameterServer : public EmbeddingKVDB¶
- #include <ps_table_batched_embeddings.h>
训练参数服务(TPS)客户端的 EmbeddingKVDB 实现。
-
class CacheContext¶
- #include <kv_db_table_batched_embeddings.h>
它持有 L2 缓存查找结果。
num_misses 是 L2 缓存查找中的未命中次数。cached_addr_list 已预分配为查找次数,以实现更好的并行性,无效位置(缓存未命中)将保留为哨兵值。
-
struct QueueItem¶
- #include <kv_db_table_batched_embeddings.h>
后台 L2/rocksdb 更新的队列项
indices/weights/count 是相应的 set() 参数
read_handles 是 cachelib 抽象的索引/嵌入对元数据,稍后将用于更新 cachelib LRU 队列,因为我们将其与 EmbeddingKVDB::get_cache() 分开。
mode 用于监控 rocksdb 写入,请查阅 RocksdbWriteMode 获取详细解释
-
class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB>¶
- #include <kv_db_table_batched_embeddings.h>
一个用于与不同缓存层和存储层交互的类,公共调用在 cuda 流上执行。
目前,它被 TBE 用于将键(嵌入索引)值(嵌入)卸载到 DRAM、SSD 或远程存储,以在不耗尽 HBM 资源的情况下提供更好的可扩展性。
由 DramKVEmbeddingCache< uint8_t >, DramKVEmbeddingCache< weight_type >, EmbeddingParameterServer, EmbeddingRocksDB 子类化。
-
class EmbeddingRocksDB : public EmbeddingKVDB¶
- #include <ssd_table_batched_embeddings.h>
RocksDB 的 EmbeddingKVDB 实现。
由 MockEmbeddingRocksDB 子类化
-
class ReadOnlyEmbeddingKVDB : public CustomClassHolder¶
- #include <ssd_table_batched_embeddings.h>
RocksDB 的 ReadOnlyEmbeddingKVDB 实现。