Agent Loop ========== 最后更新:2025/07/17。 .. versionadded:: 0.4.2 [状态:Alpha] .. warning:: Agent Loop 已准备就绪,但 API 在未来版本中可能会发生更改。 Agent Loop 被设计为多轮生成(multi-turn rollout)和智能体强化学习(agentic reinforcement learning)的通用接口。 **设计目标**: - 可插拔的用户自定义 Agent Loop - 提供标准化的请求生成 API,支持不同的推理框架 - 在多个推理服务器之间提供请求级别的负载均衡 **非目标**: - 工具的定义方式以及如何调用工具 总体概述:Agent Loop 接收一个 Prompt,然后执行用户定义的循环:调用 LLM 生成 API、调用工具等,最后返回最终输出。最终输出会计算奖励,并用作强化学习训练的轨迹。 .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/agent_loop_overview.svg?raw=true API 设计 ---------- ``AgentLoopBase`` 类是 Agent Loop 的抽象,而 ``run`` 方法是用户唯一需要实现的接口。 `run` 方法接收格式为:`[{"role": "user"}, {"content": "..."}]` 的 Prompt 消息以及额外的采样参数,可以执行用户想要的任何操作,例如: - 调用 LLM 生成 API - 调用工具:网页搜索、数据库查询、代码沙箱等 - 环境交互 - 反思 - ... .. code:: python class AgentLoopBase(ABC): @abstractmethod async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput: """运行 Agent Loop 以与 LLM 服务器和环境进行交互。 Args: sampling_params (Dict[str, Any]): LLM 采样参数。 **kwargs: 来自 `verl.utils.dataset.RLHFDataset` 的数据集字段。 Returns: AgentLoopOutput: Agent Loop 输出。 """ raise NotImplementedError 在运行用户定义的循环后,`run` 方法应返回 ``AgentLoopOutput``,包括 Prompt token ID、响应 token ID 和响应掩码。 .. code:: python class AgentLoopOutput(BaseModel): """Agent Loop 输出。""" prompt_ids: list[int] """Prompt token ID。""" response_ids: list[int] """响应 token ID,包括 LLM 生成的 token 和工具响应 token。""" response_mask: list[int] """响应掩码,1 表示 LLM 生成的 token,0 表示工具响应 token。""" .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/agent_loop_output.svg?raw=true .. note:: AgentLoopOutput 仅为给定的 Prompt 输出一条轨迹,多条轨迹输出仍在讨论中。 架构设计 ------------------- .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/agent_loop_architecture.png?raw=true 单个 PPO 步骤包含两个阶段:rollout 和 train。在 rollout 阶段: 1. PPOTrainer 从数据集中采样一个批次,并调用 ``AgentLoopManager.generate_sequences``。 2. AgentLoopManager **唤醒** 所有异步 LLM 服务器实例,这些实例将在推理引擎 (vLLM/SGLang) 和训练引擎 (FSDP/Megatron-LM) 之间同步权重。 3. AgentLoopManager 将批次分割成块,并将每个块发送到 ``AgentLoopWorker``。 4. AgentLoopWorker 接收到块,并为每个 Prompt 实例化一个用户定义的 ``AgentLoopBase``,运行 ``run`` 协程直至结束,并获取 ``AgentLoopOutput``。 .. tip:: AgentLoopWorker 并发调度多个协程。如果 AgentLoopWorker 的数量等于 batch_size,则每个 worker 负责一个 Prompt。 在 Agent Loop 中,当用户需要 LLM 生成响应时: 5. 使用 prompt_ids 调用 ``AsyncLLMServerManager.generate``。 6. AsyncLLMServerManager 选择一个在第一轮中请求数最少的服务器实例,并将其发送。 (后续轮次中,请求将发送到同一个服务器实例)。 7. AsyncLLMReceive 接收请求,通过 ipc/rpc 调用 model_runner,并生成响应。(vLLM 和 SGLang 之间存在细微差别,如下所述)。 当所有 AgentLoopWorker 中的所有 Prompts 完成后,AgentLoopManager 收集结果并返回给 PPOTrainer。 8. AgentLoopManager **休眠** 所有服务器实例,这将释放 KV 缓存并将权重卸载到 CPU 内存。 AsyncLLMServer ~~~~~~~~~~~~~~ AsyncLLMServer 是 LLM 服务器的抽象,提供两种类型的生成 API: - `OpenAI chat completion `_:为给定的聊天对话生成响应。 - Token in, token out:为给定的 token ID 生成响应 ID。 我们已正式支持 vLLM 和 SGLang AsyncLLMServer,它们都实现了这两种 API,并且经过了充分测试。 其他推理引擎可以通过实现 ``AsyncServerBase`` 类轻松接入。 .. code:: python class AsyncServerBase(ABC): @abstractmethod async def chat_completion(self, raw_request: Request) -> JSONResponse: """OpenAI chat completion API。 Args: raw_request (Request): 原始 JSON 请求 Returns: JSONResponse: JSON 响应 API 参考:https://platform.openai.com/docs/api-reference/chat/create """ raise NotImplementedError @abstractmethod async def generate(self, prompt_ids: list[int], sampling_params: dict[str, Any], request_id: str) -> list[int]: """根据 prompt IDs 生成响应 IDs。 Args: prompt_ids (List[int]): prompt ID 列表。 sampling_params (Dict[str, Any]): 采样参数。 request_id (str): 请求 ID。 Returns: List[int]: 响应 ID 列表。 """ raise NotImplementedError Chat completion vs Token in token out ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. warning:: 以下结论基于我们最近的经验,仍有待进一步研究和讨论。 几乎所有的 Agent 框架(LangGraph、CrewAI、LlamaIndex 等)都使用 OpenAI chat completion API 调用 LLM,并 将聊天历史作为消息存储。因此,用户可能期望我们在多轮生成中使用 chat completion API。 但根据我们最近在 DAPO 单轮训练和 `retool `_ 多轮训练中的经验, 我们发现,将最终消息应用 chat_template 后生成的 token ID,可能与将 Prompt ID 和每个轮次的 Response ID 拼接后生成的 token ID 不一致。 .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/multi_turn.png?raw=true **这种不一致发生在哪里?** 首先,工具解析器可能会修改内容。例如: .. code:: json {"role": "assistant", "content": "Let me call a ... and get the result"} 在提取了 tool_calls 之后,消息变为: .. code:: json {"role": "assistant", "content": "Let me call a and get the result", "tool_calls": [{"name": "foo", "arguments": "{}"}]} 将提取后的消息重新编码,生成的 token ID 与原始 LLM 生成的 response_ids 不一致。 其次,`decode-encode` 也可能导致不一致:`Agent-R1 issue#30 `_。 **这种不一致有什么影响?** 这种不一致对于服务/Agent 系统来说不是大问题,但对于 RL 训练至关重要。 它会导致轨迹偏离策略模型的分布。我们观察到,将 `apply_chat_template` 应用于最终聊天历史消息, 使得 PPO 训练在单轮情况下甚至无法收敛。 vLLM ^^^^ .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/async_vllm.png?raw=true 对于 vLLM,Async LLM Engine 运行在服务器的同一进程中,而 ModelRunner 运行在 FSDP/Megatron-LM worker 的同一进程中。 Async LLM Engine 通过 ZeroMQ 与 ModelRunner 通信。当服务器接收到请求时,它直接调用引擎来生成 response_ids。 SGLang ^^^^^^ .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/async_sglang.png?raw=true 对于 SGLang,Async LLM Engine 运行在 FSDP/Megatron-LM worker-0 的同一进程中,它会启动多个子进程作为 ModelRunner。 同样,Async LLM Engine 通过 ZeroMQ 与 ModelRunner 通信。当服务器接收到请求时,它会远程调用 worker-0 并获取 response_ids。 AsyncLLMServerManager ~~~~~~~~~~~~~~~~~~~~~ AsyncLLMServerManager 作为多个 AsyncLLMServer 实例的代理,提供: - 负载均衡:在第一轮中选择请求数最少的服务器实例并发送请求。 - 会话粘性:将 request_id 绑定到服务器实例,以便后续轮次将相同的 request_id 发送到同一个服务器实例。 AsyncLLMServerManager 被传递给 ``AgentLoopBase.__init__``,每当用户想在 Agent Loop 中与 LLM 交互时, 都可以调用 ``AsyncLLMServerManager.generate`` 来生成 response_ids。 .. code:: python class AsyncLLMServerManager: async def generate( self, request_id, *, prompt_ids: list[int], sampling_params: dict[str, Any], ) -> list[int]: """从 prompt IDs 生成 tokens。 Args: request_id (str): 用于会话粘性的请求 ID。 prompt_ids (List[int]): prompt token ID 列表。 sampling_params (Dict[str, Any]): 聊天补全的采样参数。 Returns: List[int]: 生成的 token ID 列表。 """ ... 下一步 ---- - :doc:`Agentic RL Training<../start/agentic_rl>`:使用 gsm8k 数据集快速开始 Agentic RL 训练。 - `LangGraph MathExpression `_:演示如何使用 LangGraph 构建 Agent Loop。 - `Retool `_:使用工具 Agent 端到端复现 Retool 论文。