AsyncVLLM¶
- class torchrl.modules.llm.AsyncVLLM(engine_args: AsyncEngineArgs, num_replicas: int = 1, actor_class=None, enable_prefix_caching: bool = False)[source]¶
一个管理多个异步 vLLM 引擎 Actor 以进行分布式推理的服务。
这是 TorchRL 中异步 vLLM 推理的主要入口点。它管理作为 Ray Actor 运行的多个 vLLM 引擎副本,提供负载均衡、权重更新以及统一的文本生成接口。
该服务自动处理 Ray Actor 的生命周期管理、通过 Placement Group 进行 GPU 分配,并提供与标准 vLLM API 兼容的同步和异步生成接口。
- 参数:
engine_args (AsyncEngineArgs) – vLLM 引擎的配置。
num_replicas (int, optional) – 要创建的引擎副本数量。默认为 1。
actor_class (optional) – 自定义 Ray Actor 类。默认为内部 Actor 实现。
enable_prefix_caching (bool, optional) –
是否启用前缀缓存。默认为 False。
警告
enable_prefix_caching 默认设置为 False,如果需要 prompt log probs,则建议使用此设置。如果不需要 prompt log probs,则将其设置为 True。有关更多详细信息,请参阅 此 issue。
示例
>>> from torchrl.modules.llm.backends.vllm_async import AsyncVLLM >>> from vllm import SamplingParams >>> >>> # Simple usage - single GPU, single replica >>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B") >>> >>> # Advanced usage - multi-GPU tensor parallel with multiple replicas >>> service = AsyncVLLM.from_pretrained( ... "Qwen/Qwen2.5-7B", ... num_devices=2, # Use 2 GPUs for tensor parallelism ... num_replicas=2, # Create 2 replicas for higher throughput ... max_model_len=4096 ... ) >>> >>> # Generate text >>> sampling_params = SamplingParams(temperature=0.7, max_tokens=100) >>> result = service.generate("Hello, world!", sampling_params) >>> print(result.outputs[0].text) >>> >>> # Alternative: using AsyncEngineArgs directly for advanced configuration >>> from vllm import AsyncEngineArgs >>> engine_args = AsyncEngineArgs( ... model="Qwen/Qwen2.5-3B", ... tensor_parallel_size=2 ... ) >>> service = AsyncVLLM.launch(engine_args, num_replicas=2)
注意
架构与设计
AsyncVLLM 服务实现了一个具有以下关键组件的分布式推理架构:
Ray Actor 管理:每个副本都运行为一个独立的 Ray Actor,拥有专用的 GPU 资源。该服务创建一个 Placement Group,以确保最佳的 GPU 分配,并在可能的情况下将张量并行工作节点共置于同一节点上。
负载均衡:默认情况下,生成请求通过随机选择在副本之间分发,或者可以使用 actor_index 参数定向到特定副本。
权重同步:该服务支持通过 NCCL 通信组在所有副本之间进行权重更新,从而能够与分布式训练工作流集成。
资源管理:通过 Ray Placement Group 自动进行 GPU 分配和清理,并具有适当的关机程序以防止资源泄漏。
API 兼容性:提供与 vLLM 的同步 LLM.generate() 方法相同的接口,使其成为异步工作负载的即插即用替换。
Ray 集成
该服务利用 Ray 的 Actor 模型进行分布式执行。每个副本都是一个独立的 Ray Actor,可以在不同节点上调度。该服务处理 Actor 的生命周期、监控就绪状态,并提供对所有副本的集中访问。
性能考虑
为提高重复提示的性能,默认启用前缀缓存
支持张量并行,适用于不适合单个 GPU 的大型模型
多个副本允许并发处理不同的请求
每个副本内部使用原生 vLLM 批处理以获得最佳吞吐量
错误处理
该服务包括超时支持、优雅关机程序以及在失败时尽力清理请求。Ray 的容错机制为长时间运行的推理工作负载提供了额外的弹性。
- collective_rpc(method: str, timeout: float | None = None, args: tuple = (), kwargs: dict | None = None) list[Any] [source]¶
将 RPC 转发给所有 Actor。
- 参数:
method (str) – 要调用的方法名称。
timeout (float | None) – RPC 调用的超时时间。
args (tuple) – 要传递给方法的参数。
kwargs (dict | None) – 要传递给方法的关键字参数。
- 返回:
所有 RPC 调用的 Ray futures。
- 返回类型:
list[Any]
- create_load_balancer(strategy: Literal['requests', 'kv-cache'] | Sequence[Literal['prefix-aware', 'requests', 'kv-cache', 'round-robin']] | None = None, **kwargs) LoadBalancer [source]¶
为此 AsyncVLLM 服务创建一个负载均衡器。
- 参数:
strategy – 负载均衡策略或策略序列(按回退顺序)。默认值:[“prefix-aware”, “requests”] - 先尝试缓存感知路由,然后是负载均衡。单一策略:“requests”、“kv-cache” 策略序列:[“prefix-aware”、“requests”、“round-robin”]
**kwargs – 传递给 LoadBalancer 构造函数的其他参数。
- 返回:
已配置的负载均衡器实例。此实例存储在 AsyncVLLM 实例中。
- 返回类型:
LoadBalancer
示例
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B", num_replicas=3)
>>> # Use smart defaults (prefix-aware -> requests) >>> lb = service.create_load_balancer() >>> selected_actor_index = lb.select_actor(prompt="Hello world")
>>> # Simple single strategy >>> lb = service.create_load_balancer("requests") >>> selected_actor_index = lb.select_actor()
>>> # Custom strategy hierarchy >>> lb = service.create_load_balancer( ... ["prefix-aware", "kv-cache", "round-robin"], ... prefix_length=16, ... overload_threshold=2.0 ... ) >>> selected_actor_index = lb.select_actor(prompt="Hello world")
- classmethod from_pretrained(model_name: str, num_devices: int | None = None, num_replicas: int = 1, verbose: bool = True, compile: bool = True, **kwargs) AsyncVLLM [source]¶
从预训练模型创建 AsyncVLLM 实例。
这是一个方便的方法,它将模型加载和服务的启动合并为一个调用,类似于其他机器学习库的工作方式。
- 参数:
model_name (str) – 要传递给 vLLM 的模型名称。
num_devices (int, optional) – 要使用的设备数量,每个副本。
num_replicas (int) – 要创建的引擎副本数量。
verbose (bool, optional) – 是否启用详细日志记录和吞吐量统计信息。默认为 True。
compile (bool, optional) – 是否启用模型编译以获得更好的性能。默认为 True。
**kwargs – 传递给 AsyncEngineArgs 的其他参数。
- 返回:
已启动的异步 vLLM 服务。
- 返回类型:
示例
>>> # Simple usage with defaults >>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B") >>> >>> # Multi-GPU tensor parallel with multiple replicas >>> service = AsyncVLLM.from_pretrained( ... "Qwen/Qwen2.5-7B", ... num_devices=2, ... num_replicas=2, ... max_model_len=4096 ... ) >>> >>> # Generate text >>> from vllm import SamplingParams >>> result = service.generate("Hello, world!", SamplingParams(max_tokens=50))
- generate(prompts: Any = None, sampling_params: SamplingParams | None = None, *, prompt_token_ids: list[int] | list[list[int]] | None = None, use_tqdm: bool = True, lora_request: Any = None, prompt_adapter_request: Any = None, guided_options_request: Any = None, timeout_seconds: float | None = None, actor_index: int | None = None) RequestOutput | list[RequestOutput] [source]¶
使用 vLLM.LLM.generate 接口的 Actor 生成文本。
此方法提供与 vLLM.LLM.generate 相同的接口,以实现同步和异步引擎之间的无缝兼容性。它可以用于在多个线程/Actor 中生成文本。如果未提供 actor_index,将使用负载均衡器来选择 Actor。
generate 是一个阻塞方法,因此它将等待生成完成。
- 参数:
prompts (String, TokensPrompt, or list of these) – 用于生成的输入提示。
sampling_params (SamplingParams) – 用于控制生成行为的 SamplingParams 对象。
prompt_token_ids (list[int] | list[list[int]]) – 提示的替代选项 - 用于生成的 token ID。
use_tqdm (bool) – 是否显示进度条(在异步引擎中不使用)。
lora_request (Any) – 用于基于适配器的生成的 LoRA 请求。
prompt_adapter_request (Any) – 提示适配器请求。
guided_options_request (Any) – 引导解码选项。
timeout_seconds (float | None) – 生成的超时时间(秒)。
actor_index (int | None) – 要使用的特定 Actor(如果为 None,则随机选择)。
- 返回:
来自 vLLM 的生成输出。
- 返回类型:
RequestOutput | list[RequestOutput]
- get_cache_usage(actor_index: int | None = None) float | list[float] [source]¶
获取一个或所有 Actor 的 KV 缓存使用情况。
- 参数:
actor_index (int | None) – 特定 Actor 的索引,如果为 None,则表示所有 Actor。
- 返回:
- 指定 Actor 的缓存使用率,
如果 actor_index 为 None,则表示所有 Actor 的使用率列表。
- 返回类型:
float | list[float]
- get_model_metadata() dict[str, tuple[torch.dtype, torch.Size]] [source]¶
获取模型参数元数据。
注意:这需要模型已加载。目前,我们返回一个空字典,并期望元数据在权重更新期间从外部提供。
- get_num_unfinished_requests(actor_index: int | None = None) int | list[int] [source]¶
获取一个或所有 Actor 的未完成请求数量。
- 参数:
actor_index (int | None) – 特定 Actor 的索引,如果为 None,则表示所有 Actor。
- 返回:
- 指定 Actor 的未完成请求数量,
如果 actor_index 为 None,则表示所有 Actor 的计数列表。
- 返回类型:
int | list[int]
- classmethod launch(engine_args: AsyncEngineArgs, num_replicas: int = 1) AsyncVLLM [source]¶
启动一个新的 AsyncVLLMEngineService。
- 参数:
engine_args (AsyncEngineArgs) – 创建 AsyncLLMEngine 实例的参数。
num_replicas (int) – 要创建的 Actor 副本数量。
- 返回:
已启动的服务。
- 返回类型:
AsyncVLLMEngineService
- update_weights(weights: Iterator[tuple[str, torch.Tensor]]) None [source]¶
使用 NCCL 广播在所有副本中更新模型权重。
- 参数:
weights – 产生 (parameter_name, tensor) 元组的迭代器