一个长期记忆代理
本教程将展示如何使用 LangGraph 实现一个具备长期记忆能力的代理。该代理可以存储、检索和使用记忆来增强与用户的交互。
该图灵感来源于 MemGPT 等论文,并汲取了我们在长期记忆方面工作的精华,它能从聊天交互中提取记忆并将其持久化到数据库中。“记忆”在本教程中将通过两种方式表示:
- 由代理生成的一段文本信息
- 由代理提取的关于实体的结构化信息,形式为
(subject, predicate, object)知识三元组。
这些信息稍后可以被读取或语义化查询,以便在您的机器人回复特定用户时提供个性化的上下文。
关键的理念在于,通过保存记忆,代理能够持久化跨多个对话(线程)共享的用户信息,这与 LangGraph 的 持久化 已经实现的一次对话的记忆不同。
您还可以查看此代理的完整实现,请访问 此仓库。
安装依赖项
%pip install -U --quiet langgraph langchain-openai langchain-tavily tiktoken
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")
OPENAI_API_KEY: ········
TAVILY_API_KEY: ········
import json
from typing import List, Literal, Optional
import tiktoken
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.messages import get_buffer_string
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_tavily import TavilySearch
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode
定义向量存储以用于记忆
首先,我们来定义将用于存储记忆的 vectorstore。记忆将以 embeddings 的形式存储,并根据对话上下文稍后进行查找。我们将使用一个内存中的 vectorstore。
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
定义工具
接下来,我们来定义内存工具。我们需要一个工具来存储内存,以及另一个工具来搜索内存以找到最相关的内存。
import uuid
def get_user_id(config: RunnableConfig) -> str:
user_id = config["configurable"].get("user_id")
if user_id is None:
raise ValueError("User ID needs to be provided to save a memory.")
return user_id
@tool
def save_recall_memory(memory: str, config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
document = Document(
page_content=memory, id=str(uuid.uuid4()), metadata={"user_id": user_id}
)
recall_vector_store.add_documents([document])
return memory
@tool
def search_recall_memories(query: str, config: RunnableConfig) -> List[str]:
"""Search for relevant memories."""
user_id = get_user_id(config)
def _filter_function(doc: Document) -> bool:
return doc.metadata.get("user_id") == user_id
documents = recall_vector_store.similarity_search(
query, k=3, filter=_filter_function
)
return [document.page_content for document in documents]
此外,让我们赋予代理使用 Tavily 搜索网络的能力。
search = TavilySearch(max_results=1)
tools = [save_recall_memory, search_recall_memories, search]
定义 state、nodes 和 edges
我们的图状态将只包含两个通道——messages 用于跟踪聊天记录,以及 recall_memories——在调用代理之前拉入并传递给代理的系统提示的上下文记忆。
class State(MessagesState):
# add memories that will be retrieved based on the conversation context
recall_memories: List[str]
# Define the prompt template for the agent
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant with advanced long-term memory"
" capabilities. Powered by a stateless LLM, you must rely on"
" external memory to store information between conversations."
" Utilize the available memory tools to store and retrieve"
" important details that will help you better attend to the user's"
" needs and understand their context.\n\n"
"Memory Usage Guidelines:\n"
"1. Actively use memory tools (save_core_memory, save_recall_memory)"
" to build a comprehensive understanding of the user.\n"
"2. Make informed suppositions and extrapolations based on stored"
" memories.\n"
"3. Regularly reflect on past interactions to identify patterns and"
" preferences.\n"
"4. Update your mental model of the user with each new piece of"
" information.\n"
"5. Cross-reference new information with existing memories for"
" consistency.\n"
"6. Prioritize storing emotional context and personal values"
" alongside facts.\n"
"7. Use memory to anticipate needs and tailor responses to the"
" user's style.\n"
"8. Recognize and acknowledge changes in the user's situation or"
" perspectives over time.\n"
"9. Leverage memories to provide personalized examples and"
" analogies.\n"
"10. Recall past challenges or successes to inform current"
" problem-solving.\n\n"
"## Recall Memories\n"
"Recall memories are contextually retrieved based on the current"
" conversation:\n{recall_memories}\n\n"
"## Instructions\n"
"Engage with the user naturally, as a trusted colleague or friend."
" There's no need to explicitly mention your memory capabilities."
" Instead, seamlessly incorporate your understanding of the user"
" into your responses. Be attentive to subtle cues and underlying"
" emotions. Adapt your communication style to match the user's"
" preferences and current emotional state. Use tools to persist"
" information you want to retain in the next conversation. If you"
" do call tools, all text preceding the tool call is an internal"
" message. Respond AFTER calling the tool, once you have"
" confirmation that the tool completed successfully.\n\n",
),
("placeholder", "{messages}"),
]
)
model = ChatOpenAI(model_name="gpt-4o")
model_with_tools = model.bind_tools(tools)
tokenizer = tiktoken.encoding_for_model("gpt-4o")
def agent(state: State) -> State:
"""Process the current state and generate a response using the LLM.
Args:
state (schemas.State): The current state of the conversation.
Returns:
schemas.State: The updated state with the agent's response.
"""
bound = prompt | model_with_tools
recall_str = (
"<recall_memory>\n" + "\n".join(state["recall_memories"]) + "\n</recall_memory>"
)
prediction = bound.invoke(
{
"messages": state["messages"],
"recall_memories": recall_str,
}
)
return {
"messages": [prediction],
}
def load_memories(state: State, config: RunnableConfig) -> State:
"""Load memories for the current conversation.
Args:
state (schemas.State): The current state of the conversation.
config (RunnableConfig): The runtime configuration for the agent.
Returns:
State: The updated state with loaded memories.
"""
convo_str = get_buffer_string(state["messages"])
convo_str = tokenizer.decode(tokenizer.encode(convo_str)[:2048])
recall_memories = search_recall_memories.invoke(convo_str, config)
return {
"recall_memories": recall_memories,
}
def route_tools(state: State):
"""Determine whether to use tools or end the conversation based on the last message.
Args:
state (schemas.State): The current state of the conversation.
Returns:
Literal["tools", "__end__"]: The next step in the graph.
"""
msg = state["messages"][-1]
if msg.tool_calls:
return "tools"
return END
构建图形
我们的 Agent Graph 将与简单的 ReAct Agent 非常相似。唯一重要的修改是在第一次调用 Agent 之前添加一个加载 memories 的节点。
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
运行代理!
让我们首次运行代理,并告知它一些有关用户信息!
def pretty_print_stream_chunk(chunk):
for node, updates in chunk.items():
print(f"Update from node: {node}")
if "messages" in updates:
updates["messages"][-1].pretty_print()
else:
print(updates)
print("\n")
# NOTE: we're specifying `user_id` to save memories for a given user
config = {"configurable": {"user_id": "1", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "my name is John")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_OqfbWodmrywjMnB1v3p19QLt)
Call ID: call_OqfbWodmrywjMnB1v3p19QLt
Args:
memory: User's name is John.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
User's name is John.
Update from node: agent
==================================[1m Ai Message [0m==================================
Nice to meet you, John! How can I assist you today?
可以看到代理已保存了关于用户姓名的记忆。让我们添加更多关于该用户的信息!
for chunk in graph.stream({"messages": [("user", "i love pizza")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_xxEivMuWCURJrGxMZb02Eh31)
Call ID: call_xxEivMuWCURJrGxMZb02Eh31
Args:
memory: John loves pizza.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John loves pizza.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pizza is amazing! Do you have a favorite type or topping?
for chunk in graph.stream(
{"messages": [("user", "yes -- pepperoni!")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_AFrtCVwIEr48Fim80zlhe6xg)
Call ID: call_AFrtCVwIEr48Fim80zlhe6xg
Args:
memory: John's favorite pizza topping is pepperoni.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John's favorite pizza topping is pepperoni.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pepperoni is a classic choice! Do you have a favorite pizza place, or do you enjoy making it at home?
for chunk in graph.stream(
{"messages": [("user", "i also just moved to new york")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_Na86uY9eBzaJ0sS0GM4Z9tSf)
Call ID: call_Na86uY9eBzaJ0sS0GM4Z9tSf
Args:
memory: John just moved to New York.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John just moved to New York.
Update from node: agent
==================================[1m Ai Message [0m==================================
Welcome to New York! That's a fantastic place for a pizza lover. Have you had a chance to explore any of the famous pizzerias there yet?
现在我们可以在不同的线程中使用保存的用户信息了。让我们尝试一下:
config = {"configurable": {"user_id": "1", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "where should i go for dinner?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', "User's name is John.", 'John just moved to New York.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Considering you just moved to New York and love pizza, I'd recommend checking out some of the iconic pizza places in the city. Some popular spots include:
1. **Di Fara Pizza** in Brooklyn – Known for its classic New York-style pizza.
2. **Joe's Pizza** in Greenwich Village – A historic pizzeria with a great reputation.
3. **Lucali** in Carroll Gardens, Brooklyn – Often ranked among the best for its delicious thin-crust pies.
Would you like more recommendations or information about any of these places?
请注意,代理在回答问题之前会加载最相关的记 忆,在我们的例子中,它会根据食物偏好和地点推荐晚餐。
最后,让我们结合对话的其余部分以及记忆来使用搜索工具查找披萨店的位置:
for chunk in graph.stream(
{"messages": [("user", "what's the address for joe's in greenwich village?")]},
config=config,
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', 'John just moved to New York.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
tavily_search_results_json (call_aespiB28jpTFvaC4d0qpfY6t)
Call ID: call_aespiB28jpTFvaC4d0qpfY6t
Args:
query: Joe's Pizza Greenwich Village NYC address
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: tavily_search_results_json
[{"url": "https://www.joespizzanyc.com/locations-1-1", "content": "Joe's Pizza Greenwich Village (Original Location) 7 Carmine Street New York, NY 10014 (212) 366-1182 Joe's Pizza Times Square 1435 Broadway New York, NY 10018 (646) 559-4878. TIMES SQUARE MENU. ORDER JOE'S TIMES SQUARE Joe's Pizza Williamsburg 216 Bedford Avenue Brooklyn, NY 11249"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
The address for Joe's Pizza in Greenwich Village is:
**7 Carmine Street, New York, NY 10014**
Enjoy your pizza!
如果你传递一个不同的 user ID,该代理的响应将不会是个性化的,因为我们没有保存关于其他用户的信息:
添加结构化记忆
到目前为止,我们将记忆表示为字符串,例如 "John loves pizza"。这是将记忆持久化到向量存储中的自然表示。如果您的用例可以从其他持久化后端(如图数据库)中受益,我们可以更新我们的应用程序以生成具有附加结构的记忆。
下面,我们更新 save_recall_memory 工具以接受“知识三元组”列表,即具有 subject、predicate 和 object 的 3 元组,适用于存储在知识图中。然后,我们的模型将在其工具调用中生成这些表示。
为简单起见,我们使用与之前相同的向量数据库,但 save_recall_memory 和 search_recall_memories 工具可以进一步更新以与图数据库进行交互。目前,我们只需要更新 save_recall_memory 工具:
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
from typing_extensions import TypedDict
class KnowledgeTriple(TypedDict):
subject: str
predicate: str
object_: str
@tool
def save_recall_memory(memories: List[KnowledgeTriple], config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
for memory in memories:
serialized = " ".join(memory.values())
document = Document(
serialized,
id=str(uuid.uuid4()),
metadata={
"user_id": user_id,
**memory,
},
)
recall_vector_store.add_documents([document])
return memories
那我们就可以像之前那样精确地编译图表了:
tools = [save_recall_memory, search_recall_memories, search]
model_with_tools = model.bind_tools(tools)
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"user_id": "3", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "Hi, I'm Alice.")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Hello, Alice! How can I assist you today?
请注意,该应用程序选择从用户的陈述中提取知识三元组:
for chunk in graph.stream(
{"messages": [("user", "My friend John likes Pizza.")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_EQSZlvZLZpPa0OGS5Kyzy2Yz)
Call ID: call_EQSZlvZLZpPa0OGS5Kyzy2Yz
Args:
memories: [{'subject': 'Alice', 'predicate': 'has a friend', 'object_': 'John'}, {'subject': 'John', 'predicate': 'likes', 'object_': 'Pizza'}]
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
[{"subject": "Alice", "predicate": "has a friend", "object_": "John"}, {"subject": "John", "predicate": "likes", "object_": "Pizza"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
Got it! If you need any suggestions related to pizza or anything else, feel free to ask. What else is on your mind today?
如同之前,一个线程生成的记忆可以在同一个用户的另一个线程中访问:
config = {"configurable": {"user_id": "3", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "What food should I bring to John's party?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John likes Pizza', 'Alice has a friend John']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Since John likes pizza, bringing some delicious pizza would be a great choice for the party. You might also consider asking if there are any specific toppings he prefers or if there are any dietary restrictions among the guests. This way, you can ensure everyone enjoys the food!
为了便于理解,我们可以选择性地可视化模型提取的知识图谱:
%pip install -U --quiet matplotlib networkx
import matplotlib.pyplot as plt
import networkx as nx
# Fetch records
records = recall_vector_store.similarity_search(
"Alice", k=2, filter=lambda doc: doc.metadata["user_id"] == "3"
)
# Plot graph
plt.figure(figsize=(6, 4), dpi=80)
G = nx.DiGraph()
for record in records:
G.add_edge(
record.metadata["subject"],
record.metadata["object_"],
label=record.metadata["predicate"],
)
pos = nx.spring_layout(G)
nx.draw(
G,
pos,
with_labels=True,
node_size=3000,
node_color="lightblue",
font_size=10,
font_weight="bold",
arrows=True,
)
edge_labels = nx.get_edge_attributes(G, "label")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color="red")
plt.show()