如何添加跨线程持久化(函数式 API)¶
LangGraph 允许您在**不同 线程** 之间持久化数据。例如,您可以将有关用户的信息(他们的姓名或偏好)存储在共享的(跨线程)内存中,并在新线程中(例如,新的对话)重用它们。
当使用 函数式 API 时,您可以通过使用 Store 接口将其设置为存储和检索内存:
-
创建一个
Store
实例 -
将
store
实例传递给entrypoint()
装饰器,并在函数签名中公开store
参数:
在本指南中,我们将展示如何构建和使用一个使用 Store 接口实现的共享内存的工作流。
Note
本指南使用的 Store
API 的支持已在 LangGraph v0.2.32
中添加。
本指南使用的 Store
API 的 index 和 query 参数的支持已在 LangGraph v0.2.54
中添加。
注意
如果您需要为 StateGraph
添加跨线程持久化,请查看此 操作指南。
设置¶
首先,我们安装所需的包并设置我们的 API 密钥
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
为 LangGraph 开发设置 LangSmith
注册 LangSmith,以便快速发现 LangGraph 项目中的问题并提高其性能。LangSmith 允许您使用跟踪数据来调试、测试和监控用 LangGraph 构建的 LLM 应用 — 在此处阅读有关如何开始的更多信息:https://docs.smith.langchain.com
示例:具有长期记忆的简单聊天机器人¶
定义存储¶
在本例中,我们将创建一个能够检索用户偏好信息的流程。我们将通过定义一个 InMemoryStore
来实现——这是一个可以在内存中存储数据并查询这些数据的对象。
使用 Store
接口存储对象时,需要定义两件事:
- 对象的命名空间,一个元组(类似于目录)
- 对象键(类似于文件名)
在我们的示例中,我们将使用 ("memories", <user_id>)
作为命名空间,并为每个新内存使用随机 UUID 作为键。
重要的是,为了确定用户,我们将通过节点函数的 config
关键字参数传递 user_id
。
让我们先定义我们的存储!
API Reference: OpenAIEmbeddings
from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings
in_memory_store = InMemoryStore(
index={
"embed": OpenAIEmbeddings(model="text-embedding-3-small"),
"dims": 1536,
}
)
创建工作流¶
API Reference: ChatAnthropic | RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver
import uuid
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
model = ChatAnthropic(model="claude-3-5-sonnet-latest")
@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
namespace = ("memories", user_id)
last_message = messages[-1]
memories = memory_store.search(namespace, query=str(last_message.content))
info = "\n".join([d.value["data"] for d in memories])
system_msg = f"You are a helpful assistant talking to the user. User info: {info}"
# Store new memories if the user asks the model to remember
if "remember" in last_message.content.lower():
memory = "User name is Bob"
memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})
response = model.invoke([{"role": "system", "content": system_msg}] + messages)
return response
# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
inputs: list[BaseMessage],
*,
previous: list[BaseMessage],
config: RunnableConfig,
store: BaseStore,
):
user_id = config["configurable"]["user_id"]
previous = previous or []
inputs = add_messages(previous, inputs)
response = call_model(inputs, store, user_id).result()
return entrypoint.final(value=response, save=add_messages(inputs, response))
Note
如果您使用 LangGraph Cloud 或 LangGraph Studio,则 不需要 将 store
传递给入口点装饰器 (entrypoint decorator
),因为它会自动完成。
运行工作流!¶
现在,我们在配置中指定用户 ID 并告知模型我们的名字:
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
================================== Ai Message ==================================
Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()