实用工具
最后更新时间:05/19/2025(API 文档字符串是自动生成的)。
本节介绍 VERL 库中的实用函数和类。
Python 函数式工具
Contain small python utility functions
- verl.utils.py_functional.append_to_dict(data: dict, new_data: dict, prefix: str = '')[source]
Append values from new_data to lists in data.
For each key in new_data, this function appends the corresponding value to a list stored under the same key in data. If the key doesn’t exist in data, a new list is created.
- Parameters:
data (Dict) – The target dictionary containing lists as values.
new_data (Dict) – The source dictionary with values to append.
- Returns:
The function modifies data in-place.
- Return type:
None
文件系统工具
File-system agnostic IO APIs
- verl.utils.fs.copy_to_local(src: str, cache_dir=None, filelock='.file.lock', verbose=False, always_recopy=False, use_shm: bool = False) str[source]
Copy files/directories from HDFS to local cache with validation.
- Parameters:
src (str) – Source path - HDFS path (hdfs://…), local filesystem path, or Hugging Face model ID
cache_dir (str, optional) – Local directory for cached files. Uses system tempdir if None
filelock (str) – Base name for file lock. Defaults to “.file.lock”
verbose (bool) – Enable copy operation logging. Defaults to False
always_recopy (bool) – Force fresh copy ignoring cache. Defaults to False
use_shm (bool) – Enable shared memory copy. Defaults to False
- Returns:
Local filesystem path to copied resource
- Return type:
str
追踪工具
A unified tracking interface that supports logging data to different backend
- class verl.utils.tracking.Tracking(project_name, experiment_name, default_backend: str | list[str] = 'console', config=None)[source]
A unified tracking interface for logging experiment data to multiple backends.
This class provides a centralized way to log experiment metrics, parameters, and artifacts to various tracking backends including WandB, MLflow, SwanLab, TensorBoard, and console.
- supported_backend
List of supported tracking backends.
- logger
Dictionary of initialized logger instances for each backend.
指标工具
- verl.utils.metric.reduce_metrics(metrics: dict[str, list[Any]]) dict[str, Any][source]
Reduces a dictionary of metric lists by computing the mean, max, or min of each list. The reduce operation is determined by the key name: - If the key contains “max”, np.max is used - If the key contains “min”, np.min is used - Otherwise, np.mean is used
- Parameters:
metrics – A dictionary mapping metric names to lists of metric values.
- Returns:
A dictionary with the same keys but with each list replaced by its reduced value.
Example
>>> metrics = { ... "loss": [1.0, 2.0, 3.0], ... "accuracy": [0.8, 0.9, 0.7], ... "max_reward": [5.0, 8.0, 6.0], ... "min_error": [0.1, 0.05, 0.2] ... } >>> reduce_metrics(metrics) {"loss": 2.0, "accuracy": 0.8, "max_reward": 8.0, "min_error": 0.05}
检查点管理
- verl.utils.checkpoint.checkpoint_manager.find_latest_ckpt_path(path, directory_format='global_step_{}')[source]
Return the most recent checkpoint directory based on a tracker file.
- Parameters:
path (str) – Base directory containing the checkpoint tracker.
directory_format (str) – Template for checkpoint subfolders with one placeholder for the iteration number (default “global_step_{}”).
- Returns:
Full path to the latest checkpoint directory, or None if the tracker or checkpoint folder is missing.
- Return type:
str or None
- class verl.utils.checkpoint.fsdp_checkpoint_manager.FSDPCheckpointManager(model: FullyShardedDataParallel, optimizer: Optimizer | None = None, lr_scheduler: LRScheduler | None = None, processing_class: PreTrainedTokenizer | ProcessorMixin = None, checkpoint_config: DictConfig = None, **kwargs)[source]
Manage FSDP checkpointing in SPMD training.
Saves/loads per-rank sharded model & optimizer states
Persists full lr_scheduler and RNG state
Stores HF tokenizer/processor and model/config for unified restore
- Parameters:
model (FSDP) – Wrapped model instance.
optimizer (Optimizer) – Training optimizer.
lr_scheduler (LRScheduler) – Learning-rate scheduler.
processing_class (PreTrainedTokenizer or ProcessorMixin, optional) – Pre-/post-processing artifact handler.
DictConfig (checkpoint_contents) – Configuration for checkpoint contents. - ‘load’: Components to load; must contain ‘model’. Defaults to [‘model’, ‘optimizer’, ‘extra’]. - ‘save’: Components to save; must contain ‘model’. Defaults to [‘model’, ‘optimizer’, ‘extra’].
- load_checkpoint(local_path: str, hdfs_path: str = None, del_local_after_load=False)[source]
Load an FSDP checkpoint for this rank.
- Downloads and loads:
model and optimizer shards
extra state dict (scheduler + RNG)
- Parameters:
local_path – Directory with per-rank checkpoint files.
hdfs_path – Unused (for API compatibility).
del_local_after_load – Remove local files after loading.
- save_checkpoint(local_path: str, hdfs_path: str = None, global_step: int = 0, max_ckpt_to_keep=None)[source]
Save an FSDP checkpoint for this rank.
- Writes:
model & optimizer shard files
extra state dict (scheduler + RNG)
HF tokenizer/processor and model/config on rank 0
optional full HF model under ‘huggingface/’ if requested
Rotates old checkpoints, keeping at most max_ckpt_to_keep.
- Parameters:
local_path – Target directory for checkpoint files.
hdfs_path – Unused (for API compatibility).
global_step – Current training step (used for bookkeeping).
max_ckpt_to_keep – Number of recent checkpoints to retain.
数据集工具
- class verl.utils.dataset.rl_dataset.RLHFDataset(data_files: str | list[str], tokenizer: PreTrainedTokenizer, config: DictConfig, processor: ProcessorMixin | None = None, max_samples: int = -1)[source]
Load and preprocess RLHF data from Parquet files.
Caches files locally.
Reads into a HuggingFace Dataset and tokenizes prompts.
Optionally handles images/videos via a ProcessorMixin.
Filters prompts over a max length.
Supports resuming from checkpoints.
- Parameters:
data_files (str or list) – Path(s) to Parquet file(s).
tokenizer (PreTrainedTokenizer) – For the tokenization of text to token IDs.
config (DictConfig) – Options like cache_dir, prompt_key, max_prompt_length, truncation, etc.
processor (ProcessorMixin, optional) – Multimodal preprocessor for images/videos.
- verl.utils.dataset.rl_dataset.collate_fn(data_list: list[dict]) dict[source]
Collate a batch of sample dicts into batched tensors and arrays.
- Parameters:
data_list – List of dicts mapping feature names to torch.Tensor or other values.
- Returns:
Dict where tensor entries are stacked into a torch.Tensor of shape (batch_size, *dims) and non-tensor entries are converted to np.ndarray of dtype object with shape (batch_size,).
Torch 函数式工具
Contain small torch utilities
- verl.utils.torch_functional.get_constant_schedule_with_warmup(optimizer: Optimizer, num_warmup_steps: int, last_epoch: int = -1)[source]
Create a constant LR schedule with a linear warmup phase.
- Parameters:
optimizer (Optimizer) – Wrapped optimizer.
num_warmup_steps (int) – Number of steps to ramp up the LR from 0 to initial value.
last_epoch (int, optional) – The index of the last epoch when resuming training. Defaults to -1.
- Returns:
Scheduler that increases LR linearly during warmup, then holds it constant.
- Return type:
LambdaLR
- verl.utils.torch_functional.logprobs_from_logits(logits, labels, inplace_backward=True)[source]
Compute per-token log-probabilities for the given labels.
Uses a Flash-Attention–based cross-entropy (if available) for efficient backward, otherwise falls back to a standard log-softmax+gather approach.
See: https://github.com/pytorch/pytorch/issues/563#issuecomment-330103591
- Parameters:
logits (Tensor) – Model outputs of shape (…, vocab_size).
labels (LongTensor) – True class indices of shape matching logits[…, :-1].
inplace_backward (bool) – If True and Flash-Attn is available, perform backward in-place.
- Returns:
Log-probabilities of the target labels, shape logits.shape[:-1].
- Return type:
Tensor
- verl.utils.torch_functional.masked_mean(values, mask, axis=None)[source]
Compute the mean of values over elements selected by mask.
- Parameters:
values (Tensor) – Input tensor.
mask (Tensor) – Boolean or numeric mask of the same shape as values.
axis (int or tuple of int, optional) – Dimension(s) along which to compute the mean. Defaults to None (over all elements).
- Returns:
Masked mean, with shape equal to values reduced over axis.
- Return type:
Tensor
- verl.utils.torch_functional.masked_whiten(values, mask, shift_mean=True)[source]
Whiten values by normalizing with mean and variance computed over mask.
- Parameters:
values (torch.Tensor) – Input tensor.
mask (torch.Tensor) – Boolean tensor of same shape, selects elements for stats.
shift_mean (bool) – If True (default), output is zero-mean; if False, the original mean is re-added after scaling.
- Returns:
Whitened tensor of same shape as values.
- Return type:
torch.Tensor
序列长度平衡
- verl.utils.seqlen_balancing.get_reverse_idx(idx_map)[source]
Build the inverse of an index mapping.
- Parameters:
idx_map (Sequence[int]) – Sequence where idx_map[i] = j.
- Returns:
Inverse mapping list such that output[j] = i for each i.
- Return type:
List[int]
- verl.utils.seqlen_balancing.rearrange_micro_batches(batch, max_token_len, dp_group=None, num_batches_divided_by=None, same_micro_num_in_dp=True, min_num_micro_batch=None, use_dynamic_bsz_balance=True)[source]
Split a batch into micro-batches by total token count, with optional DP sync and padding.
- Parameters:
batch (TensorDict) – must include “attention_mask” (B*S); other fields are sliced similarly.
max_token_len (int) – max sum of attention_mask per micro-batch.
dp_group (optional) – torch.distributed group for data-parallel sync.
num_batches_divided_by (optional) – virtual pipeline parallel size, for megatron.
same_micro_num_in_dp (bool) – if True and dp_group set, pad all ranks to the same count.
min_num_micro_batch (int, optional) – force at least this many splits (pads empty ones).
use_dynamic_bsz_balance (bool, optional) – balance the computational workload between micro-batches
- Returns:
the micro-batches. List[List[int]]: index lists mapping each micro-batch back to original positions.
- Return type:
List[TensorDict]
Ulysses 工具
Utilities for DeepSpeed Ulysses Sequence Parallelism. DeepSpeed Ulysses Paper: https://arxiv.org/abs/2309.14509 Inspired from: https://github.com/deepspeedai/DeepSpeed/blob/master/deepspeed/sequence/layer.py
- verl.utils.ulysses.gather_outputs_and_unpad(x: Tensor, gather_dim: int, unpad_dim: int = None, padding_size: int = 0, grad_scaler: bool = True, group: ProcessGroup | None = None)[source]
Gather a tensor across a process group and optionally unpad its padded elements.
- Parameters:
x (Tensor) – Input tensor to gather.
gather_dim (int) – Dimension along which to gather across ranks.
unpad_dim (int, optional) – Dimension from which to remove padding. If None, no unpadding.
padding_size (int) – Number of padding elements to remove on unpad_dim. Defaults to 0.
grad_scaler (bool) – Whether to apply gradient scaling during gather. Defaults to True.
group (ProcessGroup, optional) – Process group for gathering. If None, uses get_ulysses_sequence_parallel_group(). If still None, returns x unchanged.
- Returns:
The gathered tensor, with padding removed if requested.
- Return type:
Tensor
- verl.utils.ulysses.ulysses_pad_and_slice_inputs(input_ids_rmpad: Tensor, position_ids_rmpad: Tensor | None = None, sp_size: int = 1)[source]
Pad and slice input_ids to be divisible by sp_size Pad position_ids to be divisible by sp_size.
Note both input_ids_rmpad and position_ids_rmpad will be padded and sliced.
The is the utility of pre-forward for ulysses sequence parallelism
- Parameters:
input_ids_rmpad – shape of [bsz, seqlen]
position_ids_rmpad – shape of [bsz, seqlen], where bsz must be 1
sp_size (int) – ulysses sequence parallelism size
- Returns:
padded and sliced input_ids torch.Tensor: padded and sliced position_ids int: pad size
- Return type:
torch.Tensor
FSDP 工具
调试工具
- class verl.utils.profiler.GPUMemoryLogger(role: str, logger: Logger = None, level=10, log_only_rank_0: bool = True)[source]
A decorator class to log GPU memory usage.
Example
>>> from verl.utils.profiler.performance import GPUMemoryLogger >>> @GPUMemoryLogger(role="actor") >>> def update_actor(self, batch): ... # real actor update logics ... return
- verl.utils.profiler.log_gpu_memory_usage(head: str, logger: Logger = None, level=10, rank: int = 0)[source]
Log GPU memory usage information.
- Parameters:
head (str) – A descriptive header for the memory usage log message.
logger (logging.Logger, optional) – Logger instance to use for logging. If None, prints to stdout.
level – Logging level to use. Defaults to logging.DEBUG.
rank (int) – The rank of the process to log memory for. Defaults to 0.