沙盒融合工具集成

最后更新时间:2025/06/10。

动机

  • 作为 verl 的用户,我们希望允许模型在 Actor 部署(Actor rollout)过程中调用某些工具,并将结果纳入训练过程。

  • 字节跳动的一位同事提出了一个旨在通过代码执行工具增强模型能力的研究论文。

  • 我们旨在支持推理引擎的工具调用能力,使用 sandbox-fusion 作为代码执行系统,为社区提供 retools 的重新实现。

通过沙盒融合 + FaaS 集成进行奖励计算

  • 在当前的数据集和任务中,已存在类似的工作(例如 Prime),它使用本地进程作为运行器来执行模型生成的代码以进行奖励计算。

  • 在此基础上,#1429 通过集成 FaaS 作为奖励计算的运行器,推进了设计。

目标

  • 适配 sglang 工具调用协议,并为沙盒融合定义工具。

  • async-rollout 进程集成,确保沙盒融合工具遵循 asyncIO 约定。

  • 设计并实现一个基本的速率限制器,以防止 429 错误等问题。

非目标

  • 训练有效性不在范围内。

  • 不考虑可观察性指标。

  • 不处理分布式故障转移和组件容错。

设计细节

工具模式定义

  • 目前只考虑代码执行,要求模型提供的 JSON 中包含 code 字段。

  • 目前只支持 Python 代码,因此没有定义 language 参数。

OpenAIFunctionToolSchema(
    type="function",
    function=OpenAIFunctionSchema(
        name="code_interpreter",
        description="A tool for executing code.",
        parameters=OpenAIFunctionParametersSchema(
            type="object",
            properties={
                "code": OpenAIFunctionPropertySchema(
                    type="string",
                    description="The code to execute.",
                    enum=None,
                )
            },
            required=["code"],
        ),
        strict=False,
    )
)

配置参数

速率限制设计

目标:

  • 使用令牌桶模型限制并发请求的数量。

  • 确保按顺序提交给代码运行器,以避免由于回退(backoff)而导致的饿死。

设计亮点:

  • 使用 Ray Global Actor 作为集群级别的单例分布式计数器。

  • 使用信号量(Semaphore)进行计数,将 acquirerelease 置于不同的线程池中以保持顺序。

  • 使用 Ray 的 cloud-pickle 序列化函数,以便解耦 ExecutionWorker

@ray.remote(concurrency_groups={"acquire": 1,"release": 10})
class TokenBucketWorker:
    def __init__(self, rate_limit: int):
        self.rate_limit = rate_limit
        self.current_count = 0
        self._semaphore = threading.Semaphore(rate_limit)

    @ray.method(concurrency_group="acquire")
    def acquire(self):
        self._semaphore.acquire()
        self.current_count += 1

    @ray.method(concurrency_group="release")
    def release(self):
        self._semaphore.release()
        self.current_count -= 1

    def get_current_count(self):
        return self.current_count

class ExecutionWorker:
    def __init__(self, enable_global_rate_limit=True, rate_limit=10):
        self.rate_limit_worker = self._init_rate_limit(rate_limit) if enable_global_rate_limit else None

    def _init_rate_limit(self, rate_limit):
        return TokenBucketWorker.options(name="rate-limiter", get_if_exists=True).remote(rate_limit)

    def execute(self, fn: Callable[..., T], *fn_args, **fn_kwargs) -> T:
        with ExitStack() as stack:
            stack.callback(self.rate_limit_worker.release.remote)
            ray.get(self.rate_limit_worker.acquire.remote())
            try:
                return fn(*fn_args, **fn_kwargs)
            except Exception as e:
                logger.warning(f"Error when executing code: {e}")

def init_execution_pool(num_workers: int, enable_global_rate_limit=True, rate_limit=10, mode: PoolMode=PoolMode.ThreadMode):
    if mode == PoolMode.ThreadMode:
        return ray.remote(ExecutionWorker).options(max_concurrency=num_workers).remote(
            enable_global_rate_limit=enable_global_rate_limit,
            rate_limit=rate_limit
        )
    else:
        raise NotImplementedError("Process mode is not implemented yet")

工具实现

  • 使用 instance_id 来识别跨多个对话轮次的请求。

  • 使用 execution_pool 来实现异步调用。

  • 在部署完成(rollout)后清理状态。

class SandboxFusionTool(BaseTool):
    def __init__(self, config: dict, tool_schema: OpenAIFunctionToolSchema):
        ...
        self.execution_pool = init_execution_pool(...)
        ...

    async def create(self, instance_id: Optional[str] = None, ...):
        ...

     async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> Tuple[str, float, dict]:
         code = parameters.get("code", "")
         timeout = parameters.get("timeout", self.default_timeout)
         language = parameters.get("language", self.default_language)
         if not isinstance(code, str):
             code = str(code)

         result = await self.execution_pool.execute.remote(self.execute_code,instance_id,code,timeout,language)
         self._instance_dict[instance_id]["reward"].append(result.strip())

         return result, result, {}

     def execute_code(self,instance_id,code,timeout=30,language="python"):
         result_status, metadata  = _process_single_case(0, None, None,self.sandbox_fusion_url, code, timeout, language)
         # we should always expect this since we don't have correct answer
         if metadata["run_status"] == "Finished":
             actual_output = metadata["stdout"] if metadata["stdout"] is not None else ""
             return actual_output
         else:
             return "no stdout here"

    async def calc_reward(self, instance_id: str, ...):
        ...

    async def release(self, instance_id: str, ...):
        ...

测试计划

单元测试

  • test_tools_registration: 测试工具注册和初始化。

  • test_rollout_req_creation: 验证 AsyncRolloutReq 是否正确构建。

  • test_over_size_case: 确保在超出 max_seq_len 时部署提前终止。

  • test_tool_call_basic_case: 模拟 sglang 输出,验证工具调用和结果。

  • test_tool_call_batch_case: 测试工具调用的批量处理。

  • test_basic_multi_process_init: 验证 Ray 全局 Actor 是否表现为单例。

  • TestSingleNodeRateLimiterCase: 验证速率限制器在单节点模式下是否正常工作。

  • test_rotten_execution: 确保速率限制器能从函数错误中恢复。

  • TestMultiNodeRateLimiterCase: 验证在多节点环境下的行为。

端到端 (e2e) 测试

我们在 tests/special_e2e 文件夹中提供了 e2e 测试脚本,名为 tests/special_e2e/run_gsm8k_fsdp_sgl_multiturn_sf_tool.sh

通过设置 trainer.rollout_data_dir,您可以将部署数据(rollout data)转储到本地磁盘。以下是一个从部署数据中提取的示例:

{
  "input": "

  system\nYou are a math expert. You are given a question and you need to solve it step by step. Reasoning step by step before any tool call. You should use the `calc_gsm8k_reward` tool after step by step solving the question, before generate final answer at least once and refine your answer if necessary. Put your final answer in the format of `#### <answer>`.\n\n# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>\n{\"type\": \"function\", \"function\": {\"name\": \"code_interpreter\", \"description\": \"A tool for executing code.\", \"parameters\": {\"type\": \"object\", \"properties\": {\"code\": {\"type\": \"string\", \"description\": \"The code to execute.\", \"enum\": null}}, \"required\": [\"code\"]}, \"strict\": false}}\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call>\n

  user\nJohn gets a bonus that's the same percentage every year.  Last year he made $100,000 and got a $10,000 bonus.  This year he makes $200,000.  How much will John make this year when adding both his total pay and bonus together? Let's think step by step and output the final answer after `####`.\n

  assistant\n",
  "output": "assistant\n<think>\nOkay, let's see. John gets a bonus that's the same percentage every year. Last year, he made $100,000 and got a $10,000 bonus. This year, he makes $200,000. The question is asking how much he'll make this year when adding both his total pay and bonus together.\n\nFirst, I need to figure out the percentage bonus he gets. Since the bonus is the same percentage each year, if I can find that percentage based on last year's details, I can apply it to this year's earnings to find this year's bonus. Then, I'll add that bonus to his total pay this year.\n\nLast year, he earned $100,000 and got a $10,000 bonus. To find the percentage, I can divide the bonus by his total earnings. So, 10,000 divided by 100,000. Let me calculate that.\n\n10,000 / 100,000 equals 0.1, which is 10% as a percentage. So, his bonus is 10% of his pay each year.\n\nThis year, he makes $200,000. So, the bonus this year would be 10% of 200,000. Let me compute that using the code interpreter to ensure accuracy.\n

  <tool_call>\n{\"name\": \"code_interpreter\", \"arguments\": {\"code\": \"total_pay_this_year = 200000\\nbonus_percentage = 10 / 100\\nbonus_this_year = total_pay_this_year * bonus_percentage\\ntotal_income_this_year = total_pay_this_year + bonus_this_year\\nprint(total_income_this_year)\", \"executes\": \"True\"}}\n</tool_call>\n

  tool\n220000.0\n\n

  assistant\n<think>\nSo the code shows that John's total income this year would be $220,000. This is consistent with the 10% bonus on last year's pay of $100,000, which was $10,000. Therefore, this year's bonus of $20,000 added to his total pay of $200,000 results in a total income of $220,000.\n\nI need to verify that there are no other factors at play. The problem states it's a constant percentage bonus each year, so there's no mention of changing percentages or other variables. Therefore, applying a straightforward percentage of 10% on this year's partial salary amount would result in $20,000. Adding that to his total pay gives $220,000. The calculation is correct and aligns with the question's parameters.\n</think>\n\n#### 220000.0",
  "score": 0,
  "step": 1
}

这是可读格式的版本:

[system]

你是一位数学专家你将收到一个问题需要逐步解决在进行任何工具调用之前请逐步进行推理在生成最终答案之前应至少使用一次 `calc_gsm8k_reward` 工具来逐步解决问题并根据需要进行调整请将你的最终答案格式化为 `#### <答案>`。

# 工具

你可以调用一个或多个函数来协助用户查询

你将在 <tools></tools> XML 标签内提供函数签名
<tools>
{"type": "function", "function": {"name": "code_interpreter", "description": "A tool for executing code.", "parameters": {"type": "object", "properties": {"code": {"type": "string", "description": "The code to execute.", "enum": null}}, "required": ["code"]}, "strict": false}}
</tools>

对于每个函数调用请在 <tool_call></tool_call> XML 标签内返回一个包含函数名称和参数的 JSON 对象
<tool_call>
{"name": <function-name>, "arguments": <args-json-object>}
</tool_call>

[user]

约翰每年获得的奖金占其工资的百分比是相同的去年他赚了 100,000 美元获得了 10,000 美元的奖金今年他赚了 200,000 美元将他今年的总工资和奖金加在一起他今年能赚多少钱让我们一步一步思考并在 `####` 之后输出最终答案。

[assistant]

<think>
好的我们来看看约翰每年获得的奖金占其工资的百分比是相同的去年他赚了 100,000 美元并获得了 10,000 美元的奖金今年他赚了 200,000 美元问题是询问在将他今年的总工资和奖金加在一起他今年能赚多少钱

首先我需要计算出他获得的奖金百分比由于每年的奖金百分比是相同的如果我能根据去年的细节找到这个百分比我就可以将其应用于今年的收入来计算今年的奖金然后我将把这笔奖金加到他今年的总工资中

去年他赚了 100,000 美元获得了 10,000 美元的奖金要计算百分比我可以将奖金除以他的总收入所以10,000 除以 100,000我来计算一下

10,000 / 100,000 等于 0.1也就是 10%所以他每年的奖金是其工资的 10%

今年他赚了 200,000 美元所以今年的奖金将是 200,000 美元的 10%我需要通过代码解释器来计算以确保准确性
<tool_call>
{"name": "code_interpreter", "arguments": {"code": "total_pay_this_year = 200000\nbonus_percentage = 10 / 100\nbonus_this_year = total_pay_this_year * bonus_percentage\ntotal_income_this_year = total_pay_this_year + bonus_this_year\nprint(total_income_this_year)", "executes": "True"}}
</tool_call>

[tool]

220000.0

[assistant]

<think>
所以代码显示约翰今年的总收入为 220,000 美元这与他去年 100,000 美元工资的 10% 奖金 10,000 美元是一致的因此今年的 20,000 美元奖金加上他今年的 200,000 美元总工资总收入为 220,000 美元

我需要确认是否还有其他因素问题中提到每年的奖金百分比是恒定的没有提到百分比变化或其他变量因此将今年的工资额按 10% 的固定百分比计算将得到 20,000 美元将其加到他的总工资中为 220,000 美元计算是正确的并且符合问题的参数
</think>

#### 220000.0

你也可以使用 RolloutViewer TUI 工具来查看转储的部署数据:

python scripts/rollout_viewer.py ${trainer.rollout_data_dir}
RolloutViewer 截图