Agent Loop
最后更新:2025/07/17。
Added in version 0.4.2: [状态:Alpha]
Warning
Agent Loop 已准备就绪,但 API 在未来版本中可能会发生更改。
Agent Loop 被设计为多轮生成(multi-turn rollout)和智能体强化学习(agentic reinforcement learning)的通用接口。
设计目标:
可插拔的用户自定义 Agent Loop
提供标准化的请求生成 API,支持不同的推理框架
在多个推理服务器之间提供请求级别的负载均衡
非目标:
工具的定义方式以及如何调用工具
总体概述:Agent Loop 接收一个 Prompt,然后执行用户定义的循环:调用 LLM 生成 API、调用工具等,最后返回最终输出。最终输出会计算奖励,并用作强化学习训练的轨迹。
API 设计
AgentLoopBase 类是 Agent Loop 的抽象,而 run 方法是用户唯一需要实现的接口。
run 方法接收格式为:[{“role”: “user”}, {“content”: “…”}] 的 Prompt 消息以及额外的采样参数,可以执行用户想要的任何操作,例如:
调用 LLM 生成 API
调用工具:网页搜索、数据库查询、代码沙箱等
环境交互
反思
…
class AgentLoopBase(ABC):
@abstractmethod
async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput:
"""运行 Agent Loop 以与 LLM 服务器和环境进行交互。
Args:
sampling_params (Dict[str, Any]): LLM 采样参数。
**kwargs: 来自 `verl.utils.dataset.RLHFDataset` 的数据集字段。
Returns:
AgentLoopOutput: Agent Loop 输出。
"""
raise NotImplementedError
在运行用户定义的循环后,run 方法应返回 AgentLoopOutput,包括 Prompt token ID、响应 token ID 和响应掩码。
class AgentLoopOutput(BaseModel):
"""Agent Loop 输出。"""
prompt_ids: list[int]
"""Prompt token ID。"""
response_ids: list[int]
"""响应 token ID,包括 LLM 生成的 token 和工具响应 token。"""
response_mask: list[int]
"""响应掩码,1 表示 LLM 生成的 token,0 表示工具响应 token。"""
Note
AgentLoopOutput 仅为给定的 Prompt 输出一条轨迹,多条轨迹输出仍在讨论中。
架构设计
单个 PPO 步骤包含两个阶段:rollout 和 train。在 rollout 阶段:
PPOTrainer 从数据集中采样一个批次,并调用
AgentLoopManager.generate_sequences。AgentLoopManager 唤醒 所有异步 LLM 服务器实例,这些实例将在推理引擎 (vLLM/SGLang) 和训练引擎 (FSDP/Megatron-LM) 之间同步权重。
AgentLoopManager 将批次分割成块,并将每个块发送到
AgentLoopWorker。AgentLoopWorker 接收到块,并为每个 Prompt 实例化一个用户定义的
AgentLoopBase,运行run协程直至结束,并获取AgentLoopOutput。
Tip
AgentLoopWorker 并发调度多个协程。如果 AgentLoopWorker 的数量等于 batch_size,则每个 worker 负责一个 Prompt。
在 Agent Loop 中,当用户需要 LLM 生成响应时:
使用 prompt_ids 调用
AsyncLLMServerManager.generate。AsyncLLMServerManager 选择一个在第一轮中请求数最少的服务器实例,并将其发送。 (后续轮次中,请求将发送到同一个服务器实例)。
AsyncLLMReceive 接收请求,通过 ipc/rpc 调用 model_runner,并生成响应。(vLLM 和 SGLang 之间存在细微差别,如下所述)。
当所有 AgentLoopWorker 中的所有 Prompts 完成后,AgentLoopManager 收集结果并返回给 PPOTrainer。
AgentLoopManager 休眠 所有服务器实例,这将释放 KV 缓存并将权重卸载到 CPU 内存。
AsyncLLMServer
AsyncLLMServer 是 LLM 服务器的抽象,提供两种类型的生成 API:
OpenAI chat completion:为给定的聊天对话生成响应。
Token in, token out:为给定的 token ID 生成响应 ID。
我们已正式支持 vLLM 和 SGLang AsyncLLMServer,它们都实现了这两种 API,并且经过了充分测试。
其他推理引擎可以通过实现 AsyncServerBase 类轻松接入。
class AsyncServerBase(ABC):
@abstractmethod
async def chat_completion(self, raw_request: Request) -> JSONResponse:
"""OpenAI chat completion API。
Args:
raw_request (Request): 原始 JSON 请求
Returns:
JSONResponse: JSON 响应
API 参考:https://platform.openai.com/docs/api-reference/chat/create
"""
raise NotImplementedError
@abstractmethod
async def generate(self, prompt_ids: list[int], sampling_params: dict[str, Any], request_id: str) -> list[int]:
"""根据 prompt IDs 生成响应 IDs。
Args:
prompt_ids (List[int]): prompt ID 列表。
sampling_params (Dict[str, Any]): 采样参数。
request_id (str): 请求 ID。
Returns:
List[int]: 响应 ID 列表。
"""
raise NotImplementedError
Chat completion vs Token in token out
Warning
以下结论基于我们最近的经验,仍有待进一步研究和讨论。
几乎所有的 Agent 框架(LangGraph、CrewAI、LlamaIndex 等)都使用 OpenAI chat completion API 调用 LLM,并 将聊天历史作为消息存储。因此,用户可能期望我们在多轮生成中使用 chat completion API。
但根据我们最近在 DAPO 单轮训练和 retool 多轮训练中的经验, 我们发现,将最终消息应用 chat_template 后生成的 token ID,可能与将 Prompt ID 和每个轮次的 Response ID 拼接后生成的 token ID 不一致。
这种不一致发生在哪里?
首先,工具解析器可能会修改内容。例如:
{"role": "assistant", "content": "Let me call a <tool_call>...</tool_call> and get the result"}
在提取了 tool_calls 之后,消息变为:
{"role": "assistant", "content": "Let me call a and get the result", "tool_calls": [{"name": "foo", "arguments": "{}"}]}
将提取后的消息重新编码,生成的 token ID 与原始 LLM 生成的 response_ids 不一致。
其次,decode-encode 也可能导致不一致:Agent-R1 issue#30。
这种不一致有什么影响?
这种不一致对于服务/Agent 系统来说不是大问题,但对于 RL 训练至关重要。 它会导致轨迹偏离策略模型的分布。我们观察到,将 apply_chat_template 应用于最终聊天历史消息, 使得 PPO 训练在单轮情况下甚至无法收敛。
vLLM
对于 vLLM,Async LLM Engine 运行在服务器的同一进程中,而 ModelRunner 运行在 FSDP/Megatron-LM worker 的同一进程中。 Async LLM Engine 通过 ZeroMQ 与 ModelRunner 通信。当服务器接收到请求时,它直接调用引擎来生成 response_ids。
SGLang
对于 SGLang,Async LLM Engine 运行在 FSDP/Megatron-LM worker-0 的同一进程中,它会启动多个子进程作为 ModelRunner。 同样,Async LLM Engine 通过 ZeroMQ 与 ModelRunner 通信。当服务器接收到请求时,它会远程调用 worker-0 并获取 response_ids。
AsyncLLMServerManager
AsyncLLMServerManager 作为多个 AsyncLLMServer 实例的代理,提供:
负载均衡:在第一轮中选择请求数最少的服务器实例并发送请求。
会话粘性:将 request_id 绑定到服务器实例,以便后续轮次将相同的 request_id 发送到同一个服务器实例。
AsyncLLMServerManager 被传递给 AgentLoopBase.__init__,每当用户想在 Agent Loop 中与 LLM 交互时,
都可以调用 AsyncLLMServerManager.generate 来生成 response_ids。
class AsyncLLMServerManager:
async def generate(
self,
request_id,
*,
prompt_ids: list[int],
sampling_params: dict[str, Any],
) -> list[int]:
"""从 prompt IDs 生成 tokens。
Args:
request_id (str): 用于会话粘性的请求 ID。
prompt_ids (List[int]): prompt token ID 列表。
sampling_params (Dict[str, Any]): 聊天补全的采样参数。
Returns:
List[int]: 生成的 token ID 列表。
"""
...
下一步
Agentic RL Training:使用 gsm8k 数据集快速开始 Agentic RL 训练。
LangGraph MathExpression:演示如何使用 LangGraph 构建 Agent Loop。
Retool:使用工具 Agent 端到端复现 Retool 论文。