TransferQueue 数据系统

最后更新时间:2025年9月28日。

本文档介绍 TransferQueue,一个用于高效训练后(post-training)的异步流式数据管理系统。

概述

TransferQueue 是一个高性能的数据存储和传输系统,具备全景数据可见性和流式调度能力,专为优化训练后工作流中的数据流而设计。

TransferQueue 提供了细粒度的、样本级别的数据管理能力,充当数据网关,解耦了计算任务之间显式的数据依赖。这支持了一种分而治之(divide-and-conquer)的方法,极大地简化了算法控制器的设计。

组件

控制平面:全景数据管理

在控制平面,TransferQueueController 将每个训练样本的生产状态消费状态作为元数据进行跟踪。当所有必需的数据字段都准备就绪(即已写入 TransferQueueStorage)时,我们就知道该数据样本可以被下游任务消费。

对于消费状态,我们记录了每个计算任务(例如 generate_sequencescompute_log_prob 等)的消费记录。因此,即使不同的计算任务需要相同的数据字段,它们也可以独立消费数据,互不干扰。

未来,我们计划在控制平面支持负载均衡动态批处理功能。此外,我们将支持分离式框架(disaggregated frameworks)的数据管理,其中每个进程自行管理数据检索,而不是由单个控制器协调。

数据平面:分布式数据存储

在数据平面,TransferQueueStorageSimpleUnit 作为基于 CPU 内存的简易存储单元,负责数据的实际存储和检索。每个存储单元可以部署在不同的节点上,实现分布式数据管理。

TransferQueueStorageSimpleUnit 采用如下的二维数据结构:

  • 每一行代表一个训练样本,在相应的全局批次中分配有唯一索引。

  • 每一列代表计算任务的输入/输出数据字段。

这种数据结构设计源于训练后流程的计算特性,其中每个训练样本在任务管道中以接力(relayed)方式生成。它提供了精确的寻址能力,允许以流式方式进行细粒度的、并发的数据读写操作。

未来,我们计划实现一个通用的存储抽象层,以支持各种存储后端。通过这个抽象,我们希望能集成高性能存储解决方案,如 MoonCakeStore,以支持通过 RDMA 进行设备到设备的数据传输,进一步提高大规模数据的传输效率。

用户接口:异步与同步客户端

TransferQueue 系统的交互工作流程如下:

  1. 进程向 TransferQueueController 发送读取请求。

  2. TransferQueueController 扫描每个样本(行)的生产和消费元数据,并根据负载均衡策略动态组装微批次元数据。该机制支持样本级别的数据调度。

  3. 进程使用控制器提供的元数据从分布式存储单元检索实际数据。

为简化 TransferQueue 的使用,我们已将其封装为 AsyncTransferQueueClientTransferQueueClient。这些客户端同时提供异步和同步接口用于数据传输,使用户可以轻松地将 TransferQueue 集成到他们的框架中。

未来,我们将提供一个 StreamingDataLoader 接口,用于前面在 RFC#2662 中讨论的分离式框架。利用这个抽象,每个进程可以像 PyTorch 中的 DataLoader 一样自动获取自己的数据。TransferQueue 系统将处理不同并行策略导致的底层数据调度和传输逻辑,大大简化分离式框架的设计。

案例展示

通用用法

主要的交互点是 AsyncTransferQueueClientTransferQueueClient,它们充当与 TransferQueue 系统的通信接口。

核心接口:

  • (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta

  • (async_)get_data(metadata:BatchMeta) -> TensorDict

  • (async_)put(data:TensorDict, metadata:BatchMeta, global_step)

  • (async_)clear(global_step: int)

我们将很快发布详细的教程和 API 文档。

verl 示例

目前将 TransferQueue 集成到 verl 的主要动机是缓解单个控制器 RayPPOTrainer 的数据传输瓶颈。当前,所有 DataProto 对象都必须通过 RayPPOTrainer 路由,导致整个训练后系统成为单点瓶颈。

verl_dataflow_DataProto

利用 TransferQueue,我们通过以下方式将体验数据传输与元数据分发分离开:

  • DataProto 替换为 BatchMeta(元数据)和 TensorDict(实际数据)结构

  • 通过 BatchMeta 保留 verl 原有的分发/收集逻辑(保持单控制器可调试性)

  • 通过 TransferQueue 的分布式存储单元加速数据传输

verl_dataflow_TransferQueue

您可能需要参考 recipe,其中我们模拟了异步和同步场景下的 verl 用法。

引用

如果您发现此仓库有用,请引用我们的论文:
@article{han2025asyncflow,
  title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
  author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
  journal={arXiv preprint arXiv:2507.01663},
  year={2025}
}