PPO 示例架构
最后更新:2025/02/17。
让我们从近端策略优化(Proximal Policy Optimization)算法开始,这是目前在 LLM 后训练中应用最广泛的算法。
PPO 算法示例的主要入口点是: main_ppo.py。 在本教程中,我们将详细介绍 main_ppo.py 中的代码架构。
定义数据
用户需要预处理数据集并将其存储为 parquet 文件。 我们实现了 RLHFDataset 来加载和分词(tokenize)parquet 文件。
对于 ``RLHFDataset``(默认),至少需要 1 个字段:
prompt:包含字符串提示。
我们已经在 data_preprocess 目录 中提供了一些将数据集处理成 parquet 文件的示例。目前,我们支持 GSM8k、MATH、Hellasage、Full_hh_rlhf 数据集的预处理。更多信息请参阅 预处理训练后数据。
为不同数据集定义奖励函数
在这个主入口点,用户只需根据 PPO 训练中使用的数据集(或应用程序)来定义自己的奖励函数。
例如,我们在 _select_rm_score_fn 中为 GSM8k 和 MATH 数据集提供了奖励函数。在 RewardManager 中,我们将根据 data_source 计算奖励分数,以选择相应的奖励函数。对于某些 RLHF 数据集(例如 full_hh_rlhf),将使用奖励模型来评估响应,而无需任何奖励函数。在这种情况下,RewardManager 将直接返回由奖励模型计算出的 rm_score。
有关详细实现,请参阅 奖励函数。
定义 Worker 类
if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}: # for FSDP backend
assert config.critic.strategy in {"fsdp", "fsdp2"}
from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker
from verl.single_controller.ray import RayWorkerGroup
ray_worker_group_cls = RayWorkerGroup
elif config.actor_rollout_ref.actor.strategy == 'megatron': # for Megatron backend
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
ray_worker_group_cls = NVMegatronRayWorkerGroup # Ray worker class for Megatron-LM
else:
raise NotImplementedError
from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role
role_worker_mapping = {
Role.ActorRollout: ActorRolloutRefWorker,
Role.Critic: CriticWorker,
Role.RefPolicy: ActorRolloutRefWorker
}
global_pool_id = 'global_pool'
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
Role.RefPolicy: global_pool_id,
}
步骤 1:构建角色与 Worker 之间的映射关系
角色(Role)代表一组在同一个进程中的 Worker。我们在 ray_trainer.py 中预定义了几个角色。
class Role(Enum):
"""
To create more roles dynamically, you can subclass Role and add new members
"""
Actor = 0 # This worker only has Actor
Rollout = 1 # This worker only has Rollout
ActorRollout = 2 # This worker has both actor and rollout, it's a HybridEngine
Critic = 3 # This worker only has critic
RefPolicy = 4 # This worker only has reference policy
RewardModel = 5 # This worker only has reward model
ActorRolloutRef = 6 # This worker contains actor, rollout and reference policy simultaneously
步骤 2:定义与此角色对应的 Worker 类
我们已经预实现了
ActorRolloutRefWorker。通过不同的配置,它可以是独立的 Actor、独立的 Rollout、ActorRollout 混合引擎(HybridEngine),或者 ActorRolloutRef 混合引擎。我们还预实现了在两个不同后端(PyTorch FSDP 和 Megatron-LM)上用于
Actor、Rollout、Critic、Reward Model和Reference model的 Worker。 有关更多信息,请参阅 FSDP Workers 和 Megatron-LM Workers。
步骤 3:定义资源池 ID 和资源池规格
资源池(Resource pool)是对全局 GPU 资源的划分,
resource_pool_spec是一个字典,将 ID 映射到 GPU 数量。在上面的示例中,我们定义了一个全局资源池:global_pool_id,然后将所有角色放在这个资源池中,使用该后训练任务中的所有 GPU。这指的是*共同定位*(co-locate)放置,所有模型共享同一组 GPU。
有关高级用法,请参阅 资源池和放置 (resource pool and placement)。
定义奖励模型/函数
# we should adopt a multi-source reward function here
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# - finally, we combine all the rewards together
# - The reward type depends on the tag of the data
if config.reward_model.enable:
from verl.workers.fsdp_workers import RewardModelWorker
role_worker_mapping[Role.RewardModel] = RewardModelWorker
mapping[Role.RewardModel] = global_pool_id
reward_fn = RewardManager(tokenizer=tokenizer, num_examine=0)
# Note that we always use function-based RM for validation
val_reward_fn = RewardManager(tokenizer=tokenizer, num_examine=1)
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)
由于并非所有任务都使用基于模型的 RM,用户需要在这里定义它是基于模型的 RM 还是基于函数的 RM。
如果是基于模型的 RM,请直接在资源映射中添加
RewardModel角色,并将其添加到资源池映射中。请注意,预定义的
RewardModelWorker只支持结构为 HuggingfaceAutoModelForSequenceClassification的模型。如果不是这种模型,您需要在 FSDP Workers 和 Megatron-LM Workers 中定义自己的 RewardModelWorker。
如果是基于函数的 RM,用户需要为每个数据集分类奖励函数。
def _select_rm_score_fn(data_source):
if data_source == 'openai/gsm8k':
return gsm8k.compute_score
elif data_source == 'lighteval/MATH':
return math.compute_score
else:
raise NotImplementedError
有关实现,请参阅 directory 中的奖励函数。
定义、初始化和运行 PPO Trainer
trainer = RayPPOTrainer(config=config,
tokenizer=tokenizer,
role_worker_mapping=role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
val_reward_fn=val_reward_fn)
trainer.init_workers()
trainer.fit()
我们首先使用用户配置、tokenizer 以及上述所有 Worker 映射、资源池、Worker 组和奖励函数来初始化
RayPPOTrainer。我们首先调用
trainer.init_workers()来在分配的 GPU(在资源池中)上初始化模型。实际的 PPO 训练将在
trainer.fit()中执行。
通过重用 Ray 模型 Worker、资源池和奖励函数,verl 可以轻松扩展到其他 RL 算法。有关更多信息,请参阅 extension。
RayPPOTrainer 的详细信息将在 Ray Trainer 中讨论。