# 配方:单步异步策略训练器
**作者:** `https://github.com/meituan-search`
最后更新:2025年7月17日。
## 介绍
### 背景
目前 verl 实现的强化学习训练过程是同步的,遵循 PPO、GRPO 和 DAPO 等成熟方法的算法工作流程。在每一步中,最新的模型生成训练样本,并在训练完成后更新模型。虽然这种方法符合策略外强化学习(off-policy reinforcement learning)并能稳定 RL 训练,但它存在严重的效率问题。
模型更新必须等待生成阶段最长的输出完成。
在生成长尾样本的过程中,GPU 保持空闲,导致资源利用率严重不足。
样本生成中的长尾问题越严重,整体训练效率就越低。
例如,在 DAPO 32B 训练中,Rollout 阶段占总时间的约 70%,增加资源并不能缩短 Rollout 的持续时间。

> 来源数据:https://wandb.ai/verl-org/DAPO%20Reproduction%20on%20verl/workspace?nw=nwusertongyuxuan361
### 解决方案
我们实现了 **单步异步策略训练器 (One Step Off Async Trainer)** 来帮助缓解这个问题。该方法并行化生成和训练过程,利用上一步生成的样本进行当前训练。
它还涉及适当划分资源,为生成分配专用资源,并自动将剩余资源分配给训练。通过减少分配给生成阶段的资源,我们减轻了在长尾样本生成期间 GPU 空闲时间。在此过程中,生成和训练参数保持单步策略(one-step off policy)。

> 参考:[AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language Reasoning](
> https://arxiv.org/abs/2505.24298)
我们的核心贡献包括:
1. **生成与训练并行**:
下一批次的样本在新批次训练的同时异步生成。
2. **资源隔离**:
与 `hybrid_engine` 不同,此方法需要为 rollout 显式分配资源,剩余资源会自动分配给训练。
3. **NCCL 参数同步**:
采用 NCCL 通信原语,在生成和训练模块之间实现无缝的参数传输。
### 实验结果
- **机器配置**: 2 个节点,每个节点 16 个 H20 GPU
- 生成:4 个 GPU
- 训练:12 个 GPU
- **模型**:Qwen2.5-Math-7B
- **Rollout 配置**:
- **最大响应长度**:FSDP2:20,480 个 token;Megatron:8,192 个 token
- **算法**:DAPO
- **Rollout 引擎**:vLLM
| 训练模式 | 引擎 | 步数 | 生成 | 等待前生成 | 生成序列 | 旧的对数概率 | 更新 Actor | 总时间 | acc/best@32/mean | acc/maj@32/mean |
|-------------------|---------------|------|-----|---------------|--------------------|--------------|--------------|---------------|------------------|-----------------|
| colocate sync | VLLM+FSDP2 | 749 | 321 | - | 247 | 88 | 286 | 19h18m | 0.5948 | 0.417 |
| one-step-overlap async | VLLM+FSDP2 | 520 | - | 45 | 458 | 108 | 337 | 15h34m(+23%) | 0.6165 | 0.494 |
| colocate sync | VLLM+Megatron | 699 | 207 | - | 162 | 119 | 344 | 18h21m | 0.605 | 0.4217 |
| one-step-overlap async | VLLM+Megatron | 566 | - | 59 | 501 | 120 | 347 | 13h06m (+40%) | 0.6569 | 0.4038 |
* colocate sync:步数 ≈ 生成 + 旧的对数概率 + 更新 Actor
* one-step-overlap async:步数 ≈ 等待前生成 + 旧的对数概率 + 更新 Actor

> 来源数据:https://wandb.ai/hou-zg-meituan/one-step-off-policy?nw=nwuserhouzg
## 实现
### 单步异步策略流水线
我们实现的 **单步异步策略流水线 (One Step Off Policy Async Pipeline)** 以最小的成本无缝集成到现有训练逻辑中,
无需额外的样本存储管理。核心机制使用 `async_gen_next_batch` 来异步生成 rollout,同时通过 `create_continuous_iterator` 在 epoch 转换期间保持连续运行。
```python
# 迭代器生成器,简化训练过程的单步集成
def _create_continuous_iterator(self):
for epoch in range(self.config.trainer.total_epochs):
iterator = iter(self.train_dataloader)
for batch_dict in iterator:
yield epoch, batch_dict
# 读取下一个批次样本,参数同步并启动异步生成序列
def _async_gen_next_batch(self, continuous_iterator):
# 读取训练数据
try:
epoch, batch_dict = next(continuous_iterator)
except StopIteration:
return None
batch = DataProto.from_single_dict(batch_dict)
gen_batch = batch_pocess(batch)
# 同步 Actor 的权重到 rollout
self.sync_rollout_weights()
# 异步生成
gen_batch_output = self.rollout_wg.async_generate_sequences(gen_batch)
# 封装 future
return GenerationBatchFuture(epoch, batch, gen_batch_output)
continuous_iterator = self._create_continuous_iterator()
# 先运行 rollout 以实现单步异步
batch_data_future = self._async_gen_next_batch(continuous_iterator)
while batch_data_future is not None:
# 等待上一步的 gen_seq 结果
batch = batch_data_future.get()
# 启动下一个异步调用来生成序列
batch_data_future = self._async_gen_next_batch(continuous_iterator)
# 计算优势
batch = critic.compute_values(batch)
batch = reference.compute_log_prob(batch)
batch = reward.compute_reward(batch)
batch = compute_advantages(batch)
# 模型更新
critic_metrics = critic.update_critic(batch)
actor_metrics = actor.update_actor(batch)
```
### 参数同步
令人兴奋的是,我们基于 NCCL 的 rollout 模型权重更新具有出色的性能。
在大多数情况下,延迟低于 300ms,对于 RLHF 来说几乎可以忽略不计。
> **sync_rollout_weights**:从 actor 同步参数到 rollout 的时间极其短暂,几乎可以忽略,因为它使用 nccl 实现。
```python
class ActorRolloutRefWorker:
# actor 获取模型参数的元信息以进行参数同步
@register(dispatch_mode=Dispatch.ONE_TO_ALL)
def get_actor_weights_info(self):
params = self._get_actor_params()
ret = []
for key, tensor in params.items():
ret.append((key, tensor.size(), tensor.dtype))
self._weights_info = ret
return ret
# rollout 设置模型参数的元信息以进行参数同步
@register(dispatch_mode=Dispatch.ONE_TO_ALL)
def set_actor_weights_info(self, weights_info):
self._weights_info = weights_info
class AsyncRayPPOTrainer(RayPPOTrainer):
def init_workers(self):
...
# rollout 从 actor 获取模型参数的元信息以进行参数同步
weights_info = self.actor_wg.get_actor_weights_info()[0]
self.rollout_wg.set_actor_weights_info(weights_info)
# 创建 actor-rollout 通信组以进行参数同步
self.create_weight_sync_group
```
```python
# 驱动进程分别调用 actor 和 rollout 来创建一个基于 nccl/hccl 的权重同步组。
def create_weight_sync_group(self):
master_address = ray.get(self.actor_wg.workers[0]._get_node_ip.remote())
master_port = ray.get(self.actor_wg.workers[0]._get_free_port.remote())
world_size = len(self.actor_wg.workers + self.rollout_wg.workers)
self.actor_wg.create_weight_sync_group(
master_address,
master_port,
0,
world_size,
)
ray.get(
self.rollout_wg.create_weight_sync_group(
master_address,
master_port,
len(self.actor_wg.workers),
world_size,
)
)
# 驱动进程分别调用 actor 和 rollout 以通过 nccl 同步参数
def sync_rollout_weights(self):
self.actor_wg.sync_rollout_weights()
ray.get(self.rollout_wg.sync_rollout_weights())
# fsdp 模型参数同步
@register(dispatch_mode=Dispatch.ONE_TO_ALL, blocking=False)
def sync_rollout_weights(self):
params = self._get_actor_params() if self._is_actor else None
if self._is_rollout:
inference_model = (
self.rollout.inference_engine.llm_engine.model_executor.driver_worker.worker.model_runner.model
)
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
patch_vllm_moe_model_weight_loader(inference_model)
# 模型参数以逐张量的方式从 actor 广播到 rollout
for key, shape, dtype in self._weights_info:
tensor = torch.empty(shape, dtype=dtype, device=get_torch_device().current_device())
if self._is_actor:
assert key in params
origin_data = params[key]
if hasattr(origin_data, "full_tensor"):
origin_data = origin_data.full_tensor()
if torch.distributed.get_rank() == 0:
tensor.copy_(origin_data)
from ray.util.collective import collective
collective.broadcast(tensor, src_rank=0, group_name="actor_rollout")
if self._is_rollout:
inference_model.load_weights([(key, tensor)])
```
## 用法
### FSDP2 配置示例
```shell
python3 -m recipe.one_step_off_policy.async_main_ppo \
--config-path=config \
--config-name='one_step_off_ppo_trainer.yaml' \
actor_rollout_ref.actor.strategy=fsdp2 \
# actor 和 rollout 分开放置
actor_rollout_ref.hybrid_engine=False \
# actor 和 rollout 资源
trainer.nnodes=1 \
trainer.n_gpus_per_node=6 \
rollout.nnodes=1 \
rollout.n_gpus_per_node=2
```
### Megatron 配置示例
```shell
python3 -m recipe.one_step_off_policy.async_main_ppo \
--config-path=config \
--config-name='one_step_off_ppo_megatron_trainer.yaml' \
actor_rollout_ref.actor.strategy=megatron \
# actor 和 rollout 分开放置
actor_rollout_ref.hybrid_engine=False \
# actor 和 rollout 资源
trainer.nnodes=1 \
trainer.n_gpus_per_node=6 \
rollout.nnodes=1 \
rollout.n_gpus_per_node=2
```
### 配置指南
1. **卡数关系**
为达到最佳批量分布,请保持以下任一关系:
- `actor_rollout_ref.rollout.n` 应该是以下整数的除数:
`trainer.n_gpus_per_node * trainer.nnodes`
- `actor_rollout_ref.rollout.n * data.train_batch_size` 应该能被整除:
`trainer.n_gpus_per_node * trainer.nnodes`
> 理由:确保在使用部分资源进行生成时,训练样本可以均匀分布到训练 GPU 上。
2. **动态资源调优**
根据阶段持续时间调整 `trainer.nnodes` `trainer.n_gpus_per_node` `rollout.nnodes` `rollout.n_gpus_per_node`:
- **理想状态**:Rollout 和训练阶段具有可比的持续时间
- **诊断指标**:
- 监控 `wait_prev_gen` 的持续时间
- 分析 `sequence_length` 的分布
- **调整策略**:
- 高 `wait_prev_gen` + 均匀的序列长度 → 增加 rollout 资源
- 高 `wait_prev_gen` + 长尾序列 → 优化停止标准(增加资源无济于事)
> **wait_prev_gen**:等待上一次 rollout 结束所消耗的时间(未完全重叠的部分)。
**资源配置策略:**
- **资源受限场景**:通过调整 GPU 分配比例来优化资源利用率,保持节点数量相同,以便训练和 rollout 共享节点;
- 配置 `trainer.nnodes = rollout.nnodes`,其中
`trainer.n_gpus_per_node + rollout.n_gpus_per_node = physical_gpus_per_node`。通过调整 `n_gpus_per_node` 来控制 rollout 资源分配。
- **资源充裕场景**:通过调整节点数量来优化性能,保持每节点 GPU 数量相同,从而实现训练和 rollout 并行度的独立扩展。
- 配置 `trainer.n_gpus_per_node = rollout.n_gpus_per_node`,并通过调整 `trainer.nnodes` 和 `rollout.nnodes` 来控制 rollout 资源分配,以达到最佳性能。
> **注意**:系统所需的总节点数并非简单地等于 `trainer.nnodes + rollout.nnodes`。实际计算取决于 GPU 容量:
> - 当 `trainer.n_gpus_per_node + rollout.n_gpus_per_node <= physical_gpus_per_node` 时,
> 所需节点数为 `max(trainer.nnodes, rollout.nnodes)`
> - 当 `trainer.n_gpus_per_node + rollout.n_gpus_per_node > physical_gpus_per_node` 时,
> 所需节点数为 `trainer.nnodes + rollout.nnodes`
## 功能支持
| 类别 | 支持情况 |
|--------------------|-----------------------------------------------------------------------------------------------------------------|
| 训练引擎 | FSDP2
Megatron |
| rollout 引擎 | vLLM |
| AdvantageEstimator | GRPO
GRPO_PASSK
REINFORCE_PLUS_PLUS
RLOO
OPO
REINFORCE_PLUS_PLUS_BASELINE
GPG |
| Reward | 全部 |