Skip to main content
Open In ColabOpen on GitHub

如何从工具流式传输事件

如果您拥有调用 聊天模型检索器 或其他 可运行对象工具,您可能希望访问这些可运行对象的 内部事件 或为其配置其他属性。本指南将向您展示如何正确手动传递参数,以便使用 astream_events() 方法执行此操作。

兼容性

如果您在 python<=3.10 中运行异步 代码,LangChain 无法自动将配置(包括 astream_events() 所需的回调)传播到子可运行对象。这是您可能无法看到从自定义可运行对象或工具发出事件的常见原因。

如果您运行的是 python<=3.10,则需要在异步环境中手动将 RunnableConfig 对象传播到子可运行对象。有关如何手动传播配置的示例,请参见下面 bar RunnableLambda 的实现。

如果您运行的是 python>=3.11RunnableConfig 将在异步环境中自动传播到子可运行对象。但是,如果您的代码可能在较旧的 Python 版本中运行,手动传播 RunnableConfig 仍然是一个好主意。

本指南还需要 langchain-core>=0.2.16

假设您有一个自定义工具,它调用一个链,该链通过提示聊天模型返回仅 10 个单词来缩短其输入,然后反转输出。首先,以一种朴素的方式定义它:

pip install -qU "langchain[google-genai]"
import getpass
import os

if not os.environ.get("GOOGLE_API_KEY"):
os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter API key for Google Gemini: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool


@tool
async def special_summarization_tool(long_text: str) -> str:
"""A tool that summarizes input text using advanced techniques."""
prompt = ChatPromptTemplate.from_template(
"You are an expert writer. Summarize the following text in 10 words or less:\n\n{long_text}"
)

def reverse(x: str):
return x[::-1]

chain = prompt | model | StrOutputParser() | reverse
summary = await chain.ainvoke({"long_text": long_text})
return summary

直接调用该工具即可正常工作:

LONG_TEXT = """
NARRATOR:
(Black screen with text; The sound of buzzing bees can be heard)
According to all known laws of aviation, there is no way a bee should be able to fly. Its wings are too small to get its fat little body off the ground. The bee, of course, flies anyway because bees don't care what humans think is impossible.
BARRY BENSON:
(Barry is picking out a shirt)
Yellow, black. Yellow, black. Yellow, black. Yellow, black. Ooh, black and yellow! Let's shake it up a little.
JANET BENSON:
Barry! Breakfast is ready!
BARRY:
Coming! Hang on a second.
"""

await special_summarization_tool.ainvoke({"long_text": LONG_TEXT})
'.yad noitaudarg rof tiftuo sesoohc yrraB ;scisyhp seifed eeB'

但如果你想访问聊天模型的原始输出,而不是完整的工具,你可能会尝试使用 astream_events() 方法并查找 on_chat_model_end 事件。以下是会发生的情况:

stream = special_summarization_tool.astream_events({"long_text": LONG_TEXT})

async for event in stream:
if event["event"] == "on_chat_model_end":
# Never triggers in python<=3.10!
print(event)

您会注意到(除非您在 python>=3.11 中运行本指南),子运行中没有发出聊天模型事件!

这是因为上面的示例没有将工具的配置对象传递给内部链。要解决此问题,请重新定义您的工具以接受一个类型为 RunnableConfig 的特殊参数(有关更多详细信息,请参阅本指南)。执行内部链时,您还需要将该参数传递进去:

from langchain_core.runnables import RunnableConfig


@tool
async def special_summarization_tool_with_config(
long_text: str, config: RunnableConfig
) -> str:
"""A tool that summarizes input text using advanced techniques."""
prompt = ChatPromptTemplate.from_template(
"You are an expert writer. Summarize the following text in 10 words or less:\n\n{long_text}"
)

def reverse(x: str):
return x[::-1]

chain = prompt | model | StrOutputParser() | reverse
# Pass the "config" object as an argument to any executed runnables
summary = await chain.ainvoke({"long_text": long_text}, config=config)
return summary
API Reference:RunnableConfig

现在试试用你新工具进行的 astream_events() 调用,看看效果如何:

stream = special_summarization_tool_with_config.astream_events({"long_text": LONG_TEXT})

async for event in stream:
if event["event"] == "on_chat_model_end":
print(event)
{'event': 'on_chat_model_end', 'data': {'output': AIMessage(content='Bee defies physics; Barry chooses outfit for graduation day.', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-337ac14e-8da8-4c6d-a69f-1573f93b651e', usage_metadata={'input_tokens': 182, 'output_tokens': 19, 'total_tokens': 201, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}}), 'input': {'messages': [[HumanMessage(content="You are an expert writer. Summarize the following text in 10 words or less:\n\n\nNARRATOR:\n(Black screen with text; The sound of buzzing bees can be heard)\nAccording to all known laws of aviation, there is no way a bee should be able to fly. Its wings are too small to get its fat little body off the ground. The bee, of course, flies anyway because bees don't care what humans think is impossible.\nBARRY BENSON:\n(Barry is picking out a shirt)\nYellow, black. Yellow, black. Yellow, black. Yellow, black. Ooh, black and yellow! Let's shake it up a little.\nJANET BENSON:\nBarry! Breakfast is ready!\nBARRY:\nComing! Hang on a second.\n", additional_kwargs={}, response_metadata={})]]}}, 'run_id': '337ac14e-8da8-4c6d-a69f-1573f93b651e', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['225beaa6-af73-4c91-b2d3-1afbbb88d53e']}

太棒了!这次有一个事件被发射了。

对于流式处理,astream_events() 会自动调用链中的内部 runnable,如果可能的话,并启用流式处理,所以如果你想获取聊天模型生成时生成的 token 流,你只需过滤以查找 on_chat_model_stream 事件,而无需其他更改:

stream = special_summarization_tool_with_config.astream_events({"long_text": LONG_TEXT})

async for event in stream:
if event["event"] == "on_chat_model_stream":
print(event)
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', usage_metadata={'input_tokens': 182, 'output_tokens': 2, 'total_tokens': 184, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='Bee', additional_kwargs={}, response_metadata={}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5')}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' defies physics;', additional_kwargs={}, response_metadata={}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5')}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' Barry chooses outfit for', additional_kwargs={}, response_metadata={}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5')}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' graduation day.', additional_kwargs={}, response_metadata={}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5')}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', usage_metadata={'input_tokens': 0, 'output_tokens': 17, 'total_tokens': 17, 'input_token_details': {}})}, 'run_id': 'f5e049f7-4e98-4236-87ab-8cd1ce85a2d5', 'name': 'ChatAnthropic', 'tags': ['seq:step:2'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-20240620', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['51858043-b301-4b76-8abb-56218e405283']}

后续步骤

你现在已经了解了如何在工具内部流式传输事件。接下来,请查看以下指南,了解有关使用工具的更多信息:

你还可以查看一些更具体的工具调用用法: