快捷方式

AsyncVLLM

class torchrl.modules.llm.AsyncVLLM(engine_args: AsyncEngineArgs, num_replicas: int = 1, actor_class=None, enable_prefix_caching: bool = False)[source]

一个管理多个异步 vLLM 引擎 Actor 以进行分布式推理的服务。

这是 TorchRL 中异步 vLLM 推理的主要入口点。它管理作为 Ray Actor 运行的多个 vLLM 引擎副本,提供负载均衡、权重更新以及统一的文本生成接口。

该服务自动处理 Ray Actor 的生命周期管理、通过 Placement Group 进行 GPU 分配,并提供与标准 vLLM API 兼容的同步和异步生成接口。

参数:
  • engine_args (AsyncEngineArgs) – vLLM 引擎的配置。

  • num_replicas (int, optional) – 要创建的引擎副本数量。默认为 1。

  • actor_class (optional) – 自定义 Ray Actor 类。默认为内部 Actor 实现。

  • enable_prefix_caching (bool, optional) –

    是否启用前缀缓存。默认为 False。

    警告

    enable_prefix_caching 默认设置为 False,如果需要 prompt log probs,则建议使用此设置。如果不需要 prompt log probs,则将其设置为 True。有关更多详细信息,请参阅 此 issue

示例

>>> from torchrl.modules.llm.backends.vllm_async import AsyncVLLM
>>> from vllm import SamplingParams
>>>
>>> # Simple usage - single GPU, single replica
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B")
>>>
>>> # Advanced usage - multi-GPU tensor parallel with multiple replicas
>>> service = AsyncVLLM.from_pretrained(
...     "Qwen/Qwen2.5-7B",
...     num_devices=2,  # Use 2 GPUs for tensor parallelism
...     num_replicas=2,  # Create 2 replicas for higher throughput
...     max_model_len=4096
... )
>>>
>>> # Generate text
>>> sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
>>> result = service.generate("Hello, world!", sampling_params)
>>> print(result.outputs[0].text)
>>>
>>> # Alternative: using AsyncEngineArgs directly for advanced configuration
>>> from vllm import AsyncEngineArgs
>>> engine_args = AsyncEngineArgs(
...     model="Qwen/Qwen2.5-3B",
...     tensor_parallel_size=2
... )
>>> service = AsyncVLLM.launch(engine_args, num_replicas=2)

注意

架构与设计

AsyncVLLM 服务实现了一个具有以下关键组件的分布式推理架构:

  1. Ray Actor 管理:每个副本都运行为一个独立的 Ray Actor,拥有专用的 GPU 资源。该服务创建一个 Placement Group,以确保最佳的 GPU 分配,并在可能的情况下将张量并行工作节点共置于同一节点上。

  2. 负载均衡:默认情况下,生成请求通过随机选择在副本之间分发,或者可以使用 actor_index 参数定向到特定副本。

  3. 权重同步:该服务支持通过 NCCL 通信组在所有副本之间进行权重更新,从而能够与分布式训练工作流集成。

  4. 资源管理:通过 Ray Placement Group 自动进行 GPU 分配和清理,并具有适当的关机程序以防止资源泄漏。

  5. API 兼容性:提供与 vLLM 的同步 LLM.generate() 方法相同的接口,使其成为异步工作负载的即插即用替换。

Ray 集成

该服务利用 Ray 的 Actor 模型进行分布式执行。每个副本都是一个独立的 Ray Actor,可以在不同节点上调度。该服务处理 Actor 的生命周期、监控就绪状态,并提供对所有副本的集中访问。

性能考虑

  • 为提高重复提示的性能,默认启用前缀缓存

  • 支持张量并行,适用于不适合单个 GPU 的大型模型

  • 多个副本允许并发处理不同的请求

  • 每个副本内部使用原生 vLLM 批处理以获得最佳吞吐量

错误处理

该服务包括超时支持、优雅关机程序以及在失败时尽力清理请求。Ray 的容错机制为长时间运行的推理工作负载提供了额外的弹性。

collective_rpc(method: str, timeout: float | None = None, args: tuple = (), kwargs: dict | None = None) list[Any][source]

将 RPC 转发给所有 Actor。

参数:
  • method (str) – 要调用的方法名称。

  • timeout (float | None) – RPC 调用的超时时间。

  • args (tuple) – 要传递给方法的参数。

  • kwargs (dict | None) – 要传递给方法的关键字参数。

返回:

所有 RPC 调用的 Ray futures。

返回类型:

list[Any]

create_load_balancer(strategy: Literal['requests', 'kv-cache'] | Sequence[Literal['prefix-aware', 'requests', 'kv-cache', 'round-robin']] | None = None, **kwargs) LoadBalancer[source]

为此 AsyncVLLM 服务创建一个负载均衡器。

参数:
  • strategy – 负载均衡策略或策略序列(按回退顺序)。默认值:[“prefix-aware”, “requests”] - 先尝试缓存感知路由,然后是负载均衡。单一策略:“requests”、“kv-cache” 策略序列:[“prefix-aware”、“requests”、“round-robin”]

  • **kwargs – 传递给 LoadBalancer 构造函数的其他参数。

返回:

已配置的负载均衡器实例。此实例存储在 AsyncVLLM 实例中。

返回类型:

LoadBalancer

示例

>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B", num_replicas=3)
>>> # Use smart defaults (prefix-aware -> requests)
>>> lb = service.create_load_balancer()
>>> selected_actor_index = lb.select_actor(prompt="Hello world")
>>> # Simple single strategy
>>> lb = service.create_load_balancer("requests")
>>> selected_actor_index = lb.select_actor()
>>> # Custom strategy hierarchy
>>> lb = service.create_load_balancer(
...     ["prefix-aware", "kv-cache", "round-robin"],
...     prefix_length=16,
...     overload_threshold=2.0
... )
>>> selected_actor_index = lb.select_actor(prompt="Hello world")
classmethod from_pretrained(model_name: str, num_devices: int | None = None, num_replicas: int = 1, verbose: bool = True, compile: bool = True, **kwargs) AsyncVLLM[source]

从预训练模型创建 AsyncVLLM 实例。

这是一个方便的方法,它将模型加载和服务的启动合并为一个调用,类似于其他机器学习库的工作方式。

参数:
  • model_name (str) – 要传递给 vLLM 的模型名称。

  • num_devices (int, optional) – 要使用的设备数量,每个副本。

  • num_replicas (int) – 要创建的引擎副本数量。

  • verbose (bool, optional) – 是否启用详细日志记录和吞吐量统计信息。默认为 True。

  • compile (bool, optional) – 是否启用模型编译以获得更好的性能。默认为 True。

  • **kwargs – 传递给 AsyncEngineArgs 的其他参数。

返回:

已启动的异步 vLLM 服务。

返回类型:

AsyncVLLM

示例

>>> # Simple usage with defaults
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B")
>>>
>>> # Multi-GPU tensor parallel with multiple replicas
>>> service = AsyncVLLM.from_pretrained(
...     "Qwen/Qwen2.5-7B",
...     num_devices=2,
...     num_replicas=2,
...     max_model_len=4096
... )
>>>
>>> # Generate text
>>> from vllm import SamplingParams
>>> result = service.generate("Hello, world!", SamplingParams(max_tokens=50))
generate(prompts: Any = None, sampling_params: SamplingParams | None = None, *, prompt_token_ids: list[int] | list[list[int]] | None = None, use_tqdm: bool = True, lora_request: Any = None, prompt_adapter_request: Any = None, guided_options_request: Any = None, timeout_seconds: float | None = None, actor_index: int | None = None) RequestOutput | list[RequestOutput][source]

使用 vLLM.LLM.generate 接口的 Actor 生成文本。

此方法提供与 vLLM.LLM.generate 相同的接口,以实现同步和异步引擎之间的无缝兼容性。它可以用于在多个线程/Actor 中生成文本。如果未提供 actor_index,将使用负载均衡器来选择 Actor。

generate 是一个阻塞方法,因此它将等待生成完成。

参数:
  • prompts (String, TokensPrompt, or list of these) – 用于生成的输入提示。

  • sampling_params (SamplingParams) – 用于控制生成行为的 SamplingParams 对象。

  • prompt_token_ids (list[int] | list[list[int]]) – 提示的替代选项 - 用于生成的 token ID。

  • use_tqdm (bool) – 是否显示进度条(在异步引擎中不使用)。

  • lora_request (Any) – 用于基于适配器的生成的 LoRA 请求。

  • prompt_adapter_request (Any) – 提示适配器请求。

  • guided_options_request (Any) – 引导解码选项。

  • timeout_seconds (float | None) – 生成的超时时间(秒)。

  • actor_index (int | None) – 要使用的特定 Actor(如果为 None,则随机选择)。

返回:

来自 vLLM 的生成输出。

返回类型:

RequestOutput | list[RequestOutput]

get_cache_usage(actor_index: int | None = None) float | list[float][source]

获取一个或所有 Actor 的 KV 缓存使用情况。

参数:

actor_index (int | None) – 特定 Actor 的索引,如果为 None,则表示所有 Actor。

返回:

指定 Actor 的缓存使用率,

如果 actor_index 为 None,则表示所有 Actor 的使用率列表。

返回类型:

float | list[float]

get_master_address() str[source]

获取用于权重同步的主地址。

get_master_port() int[source]

获取用于权重同步的主端口。

get_model_metadata() dict[str, tuple[torch.dtype, torch.Size]][source]

获取模型参数元数据。

注意:这需要模型已加载。目前,我们返回一个空字典,并期望元数据在权重更新期间从外部提供。

get_num_unfinished_requests(actor_index: int | None = None) int | list[int][source]

获取一个或所有 Actor 的未完成请求数量。

参数:

actor_index (int | None) – 特定 Actor 的索引,如果为 None,则表示所有 Actor。

返回:

指定 Actor 的未完成请求数量,

如果 actor_index 为 None,则表示所有 Actor 的计数列表。

返回类型:

int | list[int]

get_random_actor_index() int[source]

获取一个随机 Actor 索引。

get_tp_size() int[source]

获取张量并行大小。

init_weight_update_group() None[source]

初始化权重更新通信组(RLvLLMEngine 接口)。

classmethod launch(engine_args: AsyncEngineArgs, num_replicas: int = 1) AsyncVLLM[source]

启动一个新的 AsyncVLLMEngineService。

参数:
  • engine_args (AsyncEngineArgs) – 创建 AsyncLLMEngine 实例的参数。

  • num_replicas (int) – 要创建的 Actor 副本数量。

返回:

已启动的服务。

返回类型:

AsyncVLLMEngineService

shutdown()[source]

关闭所有 Actor 并清理资源。

update_weights(weights: Iterator[tuple[str, torch.Tensor]]) None[source]

使用 NCCL 广播在所有副本中更新模型权重。

参数:

weights – 产生 (parameter_name, tensor) 元组的迭代器

文档

访问全面的 PyTorch 开发者文档

查看文档

教程

为初学者和高级开发者提供深入的教程

查看教程

资源

查找开发资源并让您的问题得到解答

查看资源