Skip to main content
Open on GitHub

流式传输

流式传输对于提高基于LLM构建的应用程序的响应能力至关重要。通过渐进式地显示输出,即使在完整响应准备好之前,流式传输也能显著改善用户体验 (UX),尤其是在处理 LLM 的延迟时。

概述

LLM生成完整响应通常需要几秒钟的延迟,这在具有多个模型调用的复杂应用程序中会更加明显。幸运的是,LLM 会迭代地生成响应,允许在中间结果生成时显示它们。通过流式传输这些中间输出,LangChain 能够提供更流畅的 LLM 应用 UX,并在其设计的核心提供对流式传输的内置支持。

在本指南中,我们将讨论 LLM 应用程序中的流式传输,并探讨 LangChain 的流式传输 API 如何支持您应用程序中各种组件的实时输出。

在 LLM 应用程序中流式传输什么

在涉及 LLM 的应用程序中,可以流式传输多种类型的数据,以通过减少感知延迟和提高透明度来改善用户体验。这些包括:

1. 流式传输 LLM 输出

最常见也是最重要的流式传输数据是 LLM 本身生成的输出。LLM 通常需要时间来生成完整响应,通过实时流式传输输出,用户可以即时看到部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式传输管道或工作流进度

除了流式传输 LLM 输出之外,流式传输更复杂的工作流或管道的进度也很有用,这能让用户对应用程序的整体进展有所了解。这可能包括:

  • 在 LangGraph 工作流中: 使用 LangGraph,工作流由节点和边组成,代表不同的步骤。这里的流式传输涉及跟踪图状态的变化,因为各个节点请求更新。这允许更精细地监视工作流中当前活动的节点,从而在工作流逐渐进行到不同阶段时提供关于工作流状态的实时更新。

  • 在 LCEL 管道中: 流式传输 LCEL 管道的更新涉及捕获来自各个子可运行组件的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行组件,从而提供对整个管道进度的实时洞察。

流式传输管道或工作流进度对于为用户提供应用程序在执行过程中所处位置的清晰视图至关重要。

3. 流式传输自定义数据

在某些情况下,您可能需要流式传输自定义数据,这些数据超出了管道或工作流结构提供的信息。这些自定义信息被注入工作流中的特定步骤,无论是工具还是 LangGraph 节点。例如,您可以实时流式传输关于工具正在做什么的更新,或者 LangGraph 节点的进度。这些直接从步骤内部发出的细粒度数据提供了对工作流执行的更详细的洞察,尤其适用于需要更高可见性的复杂流程。

流式传输 API

LangChain 提供两个主要的 API,用于实时流式传输输出。这些 API 支持实现可运行接口的任何组件,包括LLM编译后的 LangGraph 图以及使用LCEL生成的任何可运行组件。

  1. 同步 stream 和异步 astream:用于流式传输由可运行组件(例如聊天模型)生成的输出,或者流式传输由 LangGraph 创建的任何工作流。
  2. 仅异步的 astream_events:使用此 API 可访问由完全使用LCEL构建的 LLM 应用程序中的自定义事件和中间输出。请注意,此 API 可用,但在使用 LangGraph 时不需要。
note

此外,还有一个旧版的异步 astream_log API。不建议在新项目中使用此 API,因为它比其他流式传输 API 更复杂且功能更少。

stream()astream()

stream() 方法返回一个生成器,该生成器会同步地产生输出块。您可以使用 for 循环实时处理每个块。例如,当使用 LLM 时,这允许输出在生成时增量式地流式传输,从而减少用户的等待时间。

stream()astream() 方法生成的块的类型取决于正在流式传输的组件。例如,当从LLM流式传输时,每个组件将是 AIMessageChunk;但是,对于其他组件,块可能不同。

stream() 方法返回一个生成器,该生成器在生成时产生这些块。例如,

for chunk in component.stream(some_input):
# 重要提示:请尽可能高效地处理每个块。
# 在您处理当前块时,上游组件正在等待生成下一个块。例如,当处理 LangGraph 时,
# 图执行会在当前块被处理时暂停。
# 在极端情况下,这甚至可能导致超时(例如,当从具有超时的 API 流式传输 LLM 输出时)。
print(chunk)

异步版本 astream() 的工作方式类似,但它是为非阻塞工作流设计的。您可以在异步代码中使用它来实现相同的实时流式传输行为。

与聊天模型一起使用

当在聊天模型上使用 stream()astream() 时,输出会作为响应由 LLM 生成的 AIMessageChunks 进行流式传输。这允许您在 LLM 的输出被生成时增量地呈现或处理它,这在交互式应用程序或界面中特别有用。

与 LangGraph 一起使用

LangGraph 编译后的图是可运行组件,并支持标准的流式传输 API。

当在 LangGraph 上使用 streamastream 方法时,您可以选择一个或多个流模式,这些模式允许您控制要流式传输的输出类型。可用流模式包括:

  • "values":为每个步骤发出状态的所有值。
  • "updates":仅发出节点名以及节点在每个步骤后返回的更新。
  • "debug":为每个步骤发出调试事件。
  • "messages":将 LLM 消息逐个令牌地发出。
  • "custom":发出使用LangGraph 的 StreamWriter 写入的自定义输出。

更多信息请参阅:

与 LCEL 一起使用

如果您使用LangChain 的表达式语言 (LCEL) 组合多个可运行组件,则 stream()astream() 方法将按照约定流式传输链中最后一个组件的输出。这允许最终处理结果增量式地流式传输。LCEL 会尝试优化管道中的流式传输延迟,以便尽可能快地获得最后一个组件的流式传输结果。

astream_events

tip

使用 astream_events API 来访问由完全使用LCEL构建的 LLM 应用程序中的自定义数据和中间输出。

虽然此 API 也可用于LangGraph,但通常在与 LangGraph 配合使用时不需要,因为 streamastream 方法为 LangGraph 图提供了全面的流式传输功能。

对于使用 LCEL 构建的链,.stream() 方法仅流式传输链中最终组件的输出。这对于某些应用程序来说可能足够了,但当您构建包含多个 LLM 调用组合的更复杂链时,您可能希望与最终输出一起返回链的中间值。例如,当构建文档问答应用程序时,您可能希望在最终生成的同时返回源。

您可以通过使用回调或通过某种方式构建链,使其返回中间 值,例如使用链式.assign()调用,但 LangChain 还提供了一个 .astream_events() 方法,该方法将回调的灵活性与 .stream() 的易用性相结合。调用它时,它会返回一个生成器,该生成器会发出各种类型的事件,您可以根据项目的需求进行过滤和处理。

这是一个小的示例,仅打印包含流式传输的聊天模型输出的事件:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

您可以大致将其看作是回调事件的生成器(尽管格式不同)——您几乎可以在所有 LangChain 组件上使用它!

有关如何使用 .astream_events() 的更详细信息,包括可用事件的列表,请参阅此指南

将自定义数据写入流

要将自定义数据写入流,您需要根据正在使用的组件选择以下方法之一:

  1. LangGraph 的StreamWriter 可用于写入自定义数据,这些数据将在使用 LangGraph 时通过streamastream API 显示。重要提示这是 LangGraph 的一项功能,因此在处理纯 LCEL 时不可用。请参阅如何流式传输自定义数据了解更多信息。
  2. dispatch_events / adispatch_events 可用于写入自定义数据,这些数据将通过 astream_events API 显示。请参阅如何分派自定义回调事件了解更多信息。

"自动流式传输"聊天模型

LangChain 通过在某些情况下自动启用流式传输模式来简化来自聊天模型的流式传输,即使您没有显式调用流式传输方法。当您使用非流式传输的 invoke 方法但仍希望流式传输整个应用程序(包括聊天模型的中间结果)时,这尤其有用。

工作原理

当您在聊天模型上调用 invoke(或 ainvoke)方法时,如果 LangChain 检测到您正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。

在底层,它将让 invoke(或 ainvoke)使用 stream(或 astream)方法来生成其输出。从使用 invoke 的代码来看,调用的结果是相同的;然而,在聊天模型流式传输期间,LangChain 会负责调用 LangChain回调系统中的 on_llm_new_token 事件。这些回调事件 允许 LangGraph 的 stream/astreamastream_events 实时显示聊天模型的输出。

示例:

def node(state):
...
# 以下代码使用 invoke 方法,但 LangChain 会自动
# 在检测到整个
# 应用程序正在流式传输时切换到流式传输模式
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供许多方法的同步 (sync) 和异步 (async) 版本。异步方法通常以 "a" 作为前缀(例如 ainvokeastream)。在编写异步代码时,必须一致地使用这些异步方法以确保非阻塞行为和最佳性能。

如果流式传输的数据未能实时显示,请确保您为工作流使用了正确的异步方法。

请查阅LangChain 中的异步编程指南以了解更多关于使用 LangChain 编写异步代码的信息。

相关资源

请参阅以下操作指南,了解 LangChain 中流式传输的具体示例:

有关将自定义数据写入流的信息,请参阅以下资源: