Skip to content

持久化

LangGraph 内置了持久化层,通过 checkpointer 实现。当你使用 checkpointer 编译图时,checkpointer 会在每个超级步(super-step)保存图状态的 checkpoint。这些检查点被保存在一个 thread 中,可以在图执行后访问。由于 threads 允许在执行后访问图的状态,因此实现了诸如人工干预(human-in-the-loop)、记忆(memory)、时间旅行(time travel)和容错(fault-tolerance)等强大功能。下面我们将详细讨论这些概念。

Checkpoints

LangGraph API 自动处理检查点

在使用 LangGraph API 时,你无需手动实现或配置 checkpointer。API 会在后台为你处理所有的持久化基础架构。

Threads (线程)

Thread 是由 checkpointer 为每个保存的检查点分配的唯一 ID 或线程标识符。它包含了一系列运行(runs)的累积状态。当运行被执行时,助手的底层图的状态将被持久化到该线程。

当使用 checkpointer 调用图时,你**必须**在 configconfigurable 部分指定一个 thread_id

{"configurable": {"thread_id": "1"}}

可以检索线程的当前状态和历史状态。要持久化状态,必须在执行运行之前创建线程。LangGraph Platform API 提供了多个用于创建和管理线程及线程状态的端点。更多详情请参阅API 参考文档

Checkpoints (检查点)

线程在特定时间点的状态称为检查点。检查点是保存在每个超级步的图状态快照,并由 StateSnapshot 对象表示,该对象具有以下关键属性:

  • config: 与此检查点关联的配置。
  • metadata: 与此检查点关联的元数据。
  • values: 在此时间点的状态通道(channel)的值。
  • next: 一个元组,包含图中接下来要执行的节点名称。
  • tasks: 一个 PregelTask 对象元组,包含有关接下来要执行的任务的信息。如果该步骤之前已被尝试过,则会包含错误信息。如果图在节点内部动态中断,tasks 将包含与中断关联的附加数据。

检查点会被持久化,并可用于稍后恢复线程的状态。

让我们看看当我们按如下方式调用一个简单图时会保存哪些检查点:_content

API Reference: StateGraph | START | END | InMemorySaver

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

运行图后,我们期望看到正好 4 个检查点:

  • 一个空检查点,START 作为下一个要执行的节点。
  • 一个包含用户输入 {'foo': '', 'bar': []} 的检查点,node_a 作为下一个要执行的节点。
  • 一个包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,node_b 作为下一个要执行的节点。
  • 一个包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有下一个要执行的节点。

请注意,由于我们为 bar 通道设置了累加器(reducer),因此 bar 通道的值包含了两个节点的输出。

Get state (获取状态)

在与已保存的图状态交互时,你**必须**指定一个线程标识符。你可以通过调用 graph.get_state(config) 来查看图的*最新*状态。这将返回一个 StateSnapshot 对象,该对象对应于在配置中提供的线程 ID 的最新检查点,或者如果提供了检查点 ID,则对应于该线程的检查点。

# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

在我们的示例中,get_state 的输出将如下所示:

StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

Get state history (获取状态历史)

你可以通过调用 graph.get_state_history(config) 来获取给定线程的完整图执行历史。这将返回与配置中提供的线程 ID 相关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最近的检查点/StateSnapshot 是列表中的第一个。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

在我们的示例中,get_state_history 的输出将如下所示:

[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.19946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]

State

Replay (重放)

也可以回放之前的图执行。如果我们使用 thread_idcheckpoint_id 来调用图,那么我们将*重放*在与 checkpoint_id 对应的检查点之前的先前执行步骤,并且只执行检查点之后的步骤。

  • thread_id 是线程的 ID。
  • checkpoint_id 是引用线程内特定检查点的标识符。

在调用图时,你必须将这些作为配置的 configurable 部分传递:

config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)

重要的是,LangGraph 会知道某个特定步骤是否先前已被执行。如果是,LangGraph 会简单地*重放*图中的该特定步骤,而不是重新执行它,但这仅适用于提供 checkpoint_id 之前的步骤。checkpoint_id 之后的所有步骤都将执行(即,一个新的分支),即使它们之前已被执行过。有关重放的更多信息,请参阅此操作指南

Replay

Update state (更新状态)

除了从特定 checkpoints 重放图之外,我们还可以*编辑*图状态。我们通过使用 graph.update_state() 来实现这一点。此方法接受三个不同的参数:

config

配置应包含 thread_id,指定要更新的线程。仅传递 thread_id 时,我们会更新(或分支)当前状态。可选地,如果包含 checkpoint_id 字段,则分支选定的检查点。

values

这些是要用于更新状态的值。请注意,此更新被视为与从节点发出的任何更新完全相同。这意味着这些值将被传递给累加器(reducer)函数(如果为图状态中的某些通道定义了它们)。这意味着 update_state **不会**自动覆盖每个通道的值,而只会覆盖没有累加器的通道。让我们通过一个例子来了解。

假设你已使用以下架构定义了图的状态(请参阅上面的完整示例):

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

现在假设图的当前状态是:

{"foo": 1, "bar": ["a"]}

如果你如下更新状态:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

那么新的图状态将是:

{"foo": 2, "bar": ["a", "b"]}

foo 键(通道)已完全更改(因为没有为该通道指定累加器,所以 update_state 覆盖了它)。但是,为 bar 键指定了累加器,因此它将 "b" 追加到 bar 的状态。

as_node

调用 update_state 时,我们可以选择性指定的最后一项是 as_node。如果提供了 as_node,则更新将被视为来自节点 as_node。如果未提供 as_node,则它将设置为更新状态的最后一个节点,除非存在歧义。之所以如此重要,是因为下一步的执行取决于最后更新状态的节点,因此这可以用来控制哪个节点先执行。有关分支状态的更多信息,请参阅此操作指南

Update

Memory Store (内存存储)

Model of shared state

State schema 指定了一组在图执行过程中填充的键。如上所述,状态可以由 checkpointer 在每个图步骤中保存到线程,从而实现状态持久化。

但是,如果我们希望保留一些信息*跨线程*共享呢?考虑一个聊天机器人,我们希望在与该用户的*所有*聊天对话(线程)中保留关于该用户的特定信息(例如,用户的偏好)!

仅使用 checkpointer,我们无法跨线程共享信息。这就需要 Store 接口。作为示例,我们可以定义一个 InMemoryStore 来存储跨线程的用户信息。我们像以前一样编译图,并使用 checkpointer 和我们新的 in_memory_store 变量。

LangGraph API 自动处理 Store

在使用 LangGraph API 时,你无需手动实现或配置 store。API 会在后台为你处理所有的存储基础架构。

Basic Usage (基本用法)

首先,我们将在不使用 LangGraph 的情况下单独展示这一点。

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

内存是按 tuple 命名空间的,在本例中是 (<user_id>, "memories")。名称空间可以是任意长度,代表任何内容,不一定特定于用户。

user_id = "1"
namespace_for_memory = (user_id, "memories")

我们使用 store.put 方法将内存保存到我们名称空间中的 Store 中。这样做时,我们指定名称空间(如上定义)以及内存的键值对:键只是内存的唯一标识符(memory_id),值(一个字典)是内存本身。

memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)

我们可以使用 store.search 方法来读取我们名称空间中的内存,该方法将返回给定用户的ョ所有内存作为列表。最近的内存是列表中的最后一个。

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

每种内存类型都是一个 Python 类(Item),具有某些属性。我们可以通过如上所述的 .dict 转换将其作为字典访问。 它具有的属性是:

  • value: 此内存的值(本身是一个字典)。
  • key: 此名称空间中此内存的唯一密钥。
  • namespace: 一个字符串列表,是此内存类型的名称空间。
  • created_at: 创建此内存的时间戳。
  • updated_at: 更新此内存的时间戳。

除了简单的检索之外,Store 还支持语义搜索,允许你根据含义而不是精确匹配来查找内存。要启用此功能,请使用嵌入模型配置 Store:

API Reference: init_embeddings

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)

现在搜索时,你可以使用自然语言查询来查找相关的内存:

# 查找与食物偏好相关的内存
# (这可以在将内存放入 Store 后完成)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # 返回前 3 个匹配项
)

你可以通过配置 fields 参数或在存储内存时指定 index 参数来控制要嵌入的内存部分:

# 使用特定字段进行嵌入存储
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # 仅嵌入 "food_preferences" 字段
)

# 不进行嵌入存储(仍可检索,但无法搜索)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

Using in LangGraph (在 LangGraph 中使用)

将所有这些都准备好后,我们在 LangGraph 中使用 in_memory_storein_memory_store 与 checkpointer 协同工作:checkpointer 将状态保存到线程(如上所述),而 in_memory_store 允许我们存储任意信息以供*跨*线程访问。我们像这样同时使用 checkpointer 和 in_memory_store 来编译图。

API Reference: InMemorySaver

from langgraph.checkpoint.memory import InMemorySaver

# 我们需要这个,因为我们要启用线程(对话)
checkpointer = InMemorySaver()

# ... 定义图结构 ...

# 使用 checkpointer 和 store 编译图
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

我们像以前一样,使用 thread_id 调用图,同时也使用 user_id,我们将如上所示使用 user_id 为我们的内存设置名称空间。

# 调用图
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# 首先,我们只是向 AI 打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)

我们可以通过将 store: BaseStoreconfig: RunnableConfig 作为节点参数来访问*任何节点*中的 in_memory_storeuser_id。以下是我们如何在节点中使用语义搜索来查找相关内存的方法:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # 从 config 中获取 user id
    user_id = config["configurable"]["user_id"]

    # 为内存设置名称空间
    namespace = (user_id, "memories")

    # ... 分析对话并创建新内存

    # 创建新的内存 ID
    memory_id = str(uuid.uuid4())

    # 我们创建一个新内存
    store.put(namespace, memory_id, {"memory": memory})

如上所示,我们还可以在任何节点访问 Store 并使用 store.search 方法来获取内存。请记住,内存以对象列表的形式返回,可以转换为字典。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

我们可以访问内存并在模型调用中使用它们。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # 从 config 中获取 user id
    user_id = config["configurable"]["user_id"]

    # 为内存设置名称空间
    namespace = (user_id, "memories")

    # 基于最近一条消息进行搜索
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... 在模型调用中使用内存

如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的内存。

# 调用图
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 让我们再次打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)

当我们在 LangGraph Platform 上运行时,无论是本地(例如,在 LangGraph Studio 中)还是使用 LangGraph Platform,默认情况下基础 Store 都可用,无需在图编译时指定。但是,要启用语义搜索,你**确实**需要配置 langgraph.json 文件中的索引设置。例如:

{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}

有关更多详细信息和配置选项,请参阅部署指南

Checkpointer libraries (Checkpointer 库)

在底层,检查点由符合 BaseCheckpointSaver 接口的 checkpointer 对象提供支持。LangGraph 提供了多个 checkpointer 实现,所有这些实现都通过独立的可安装库实现:

  • langgraph-checkpoint:checkpointer saver(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)的基础接口。包括用于实验的内存 checkpointer 实现(InMemorySaver)。LangGraph 自带 langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库(SqliteSaver / AsyncSqliteSaver)的 LangGraph checkpointer 实现。非常适合实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:一个高级 checkpointer,使用 Postgres 数据库(PostgresSaver / AsyncPostgresSaver),用于 LangGraph Platform。非常适合在生产环境中使用。需要单独安装。

Checkpointer interface (Checkpointer 接口)

每个 checkpointer 都符合 BaseCheckpointSaver 接口并实现以下方法:

  • .put - 存储具有其配置和元数据的检查点。
  • .put_writes- 存储与检查点关联的中间写入(即待处理写入)。
  • .get_tuple - 使用给定的配置(thread_idcheckpoint_id)获取检查点元组。这用于填充 graph.get_state() 中的 StateSnapshot
  • .list - 列出与给定配置和过滤条件匹配的检查点。这用于填充 graph.get_state_history() 中的状态历史。

如果 checkpointer 与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),则将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。

Note

要异步运行图,可以使用 InMemorySaver,或 Sqlite/Postgres checkpointer 的异步版本——AsyncSqliteSaver / AsyncPostgresSaver checkpointers。

Serializer (序列化器)

当 checkpointer 保存图状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。 langgraph_checkpoint 定义了用于实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),该实现处理各种类型,包括 LangChain 和 LangGraph 的原始类型、日期时间、枚举等。

使用 pickle 进行序列化

默认序列化器 JsonPlusSerializer 在后台使用 ormsgpack 和 JSON,这不适用于所有类型的对象。

如果你希望为我们 msgpack 编码尚不支持的对象(例如 Pandas DataFrames)回退到 pickle, 你可以使用 JsonPlusSerializerpickle_fallback 参数:

API Reference: MemorySaver | JsonPlusSerializer

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... 定义图结构 ...
graph.compile(
    checkpointer=MemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

Encryption (加密)

Checkpointers 可以选择性地加密所有持久化的状态。要启用此功能,请将 EncryptedSerializer 的实例传递给任何 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器的最简单方法是通过 from_pycryptodome_aes,它从 LANGGRAPH_AES_KEY环境变量读取 AES 密钥(或接受一个 key 参数):

API Reference: SqliteSaver

import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # 读取 LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)

API Reference: PostgresSaver

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()

在 LangGraph Platform 上运行时,只要 LANGGRAPH_AES_KEY 存在,加密就会自动启用,因此你只需提供环境变量。通过实现 CipherProtocol 并将其提供给 EncryptedSerializer,可以使用其他加密方案。

Capabilities (能力)

Human-in-the-loop (人工干预)

首先,checkpointers 通过允许人工检查、中断和批准图步骤来促进人工干预工作流工作流。这些工作流需要 checkpointers,因为人工需要能够随时查看图的状态,并且在人工对状态进行任何更新后图必须能够恢复执行。有关示例,请参阅操作指南

Memory (记忆)

其次,checkpointers 允许在交互之间进行"记忆"。在进行重复的人工交互(例如对话)时,任何后续消息都可以发送到该线程,该线程将保留其先前交互的记忆。有关使用 checkpointers 添加和管理对话记忆的信息,请参阅添加记忆

Time Travel (时间旅行)

第三,checkpointers 允许"时间旅行",使用户能够回放先前的图执行以审查和/或调试特定的图步骤。此外,checkpointers 可以将图状态在任意检查点分支,以探索备选轨迹。

Fault-tolerance (容错)

最后,检查点还提供容错和错误恢复:如果一个或多个节点在一个给定的超级步失败,你可以从上次成功的步骤恢复你的图。此外,当图节点在给定超级步的执行过程中失败时,LangGraph 会保存该超级步中其他成功完成的节点产生的待处理检查点写入,以便每当我们从该超级步恢复图执行时,都不会重新运行成功的节点。

Pending writes (待处理写入)

此外,当图节点在给定超级步的执行过程中失败时,LangGraph 会保存该超级步中其他成功完成的节点产生的待处理检查点写入,以便每当我们从该超级步恢复图执行时,都不会重新运行成功的节点。