LangGraph 第12章:持久化内存

持久化是构建生产级 LLM 应用的基础能力。LangGraph 通过 Checkpointer 机制实现状态的保存、恢复和历史回溯。

Checkpointer 接口

Checkpointer 是 LangGraph 的状态持久化抽象,核心接口如下:

from langgraph.checkpoint.base import BaseCheckpointSaver

# Checkpointer 的核心功能:
# 1. put(config, checkpoint, metadata): 保存状态快照
# 2. get(config): 获取指定配置的最新状态
# 3. list(config): 列出所有历史状态
# 4. put_writes(config, writes): 保存中间写入

LangGraph 提供了多种 Checkpointer 实现:

实现存储位置适用场景
MemorySaver内存开发测试、演示
SqliteSaverSQLite 文件单机生产环境
AsyncSqliteSaverSQLite(异步)异步应用
PostgresSaverPostgreSQL分布式生产环境
MongoDBSaverMongoDB文档数据库场景

MemorySaver:内存级检查点

MemorySaver 将状态保存在内存中,应用重启后丢失。适合开发和测试:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI

# 创建带持久化的应用
memory = MemorySaver()
app = builder.compile(checkpointer=memory)

# 通过 thread_id 隔离不同会话
config_1 = {"configurable": {"thread_id": "session_1"}}
config_2 = {"configurable": {"thread_id": "session_2"}}

# 两个会话互相独立
app.invoke({"messages": [("human", "你好")]}, config_1)
app.invoke({"messages": [("human", "Hello")]}, config_2)

SQLite 持久化

SqliteSaver(同步)

from langgraph.checkpoint.sqlite import SqliteSaver

# 使用内存 SQLite(临时存储)
checkpointer = SqliteSaver.from_conn_string(":memory:")

# 使用文件 SQLite(持久存储)
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

# 在编译时传入
app = builder.compile(checkpointer=checkpointer)

AsyncSqliteSaver(异步)

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
import asyncio

# 异步 SQLite 持久化
async def main():
    checkpointer = AsyncSqliteSaver.from_conn_string("async_checkpoints.db")
    app = builder.compile(checkpointer=checkpointer)

    result = await app.ainvoke(
        {"messages": [("human", "你好")]},
        {"configurable": {"thread_id": "async_session"}}
    )

asyncio.run(main())

查看历史会话

Checkpointer 提供了完整的会话历史查询能力:

# 获取当前状态
current = app.get_state(config)
print(f"当前状态: {current.values}")

# 获取所有历史状态
history = list(app.get_state_history(config))
print(f"共有 {len(history)} 个历史状态")

# 遍历历史(从新到旧)
for i, state in enumerate(history):
    print(f"[{i}] 状态: {state.values}")
    print(f"    下一个节点: {state.next}")

# 恢复到指定状态
parent_config = history[-1].config  # 恢复到初始状态
app.invoke(
    {"messages": [("human", "从历史状态继续")]},
    parent_config
)

跨会话的状态管理

# 列出所有会话
all_configs = []
for state in app.get_state_history(config):
    if state.config["configurable"]["thread_id"] not in all_configs:
        all_configs.append(state.config["configurable"]["thread_id"])

print(f"共有 {len(all_configs)} 个不同的会话")

会话恢复

恢复已有会话

# 假设 thread_id 为 "user_123" 的会话已存在
config = {"configurable": {"thread_id": "user_123"}}

# 检查会话是否存在
try:
    state = app.get_state(config)
    print(f"会话存在,当前状态: {state.values}")
except ValueError:
    print("会话不存在")
    state = None

# 继续已有会话
if state:
    result = app.invoke(
        {"messages": [("human", "继续之前的对话")]},
        config
    )

分支执行(时间旅行)

Checkpointer 支持从任意历史节点分支执行:

# 1. 获取历史状态列表
history = list(app.get_state_history(config))

# 2. 选择想回溯的状态(比如倒数第3个状态)
target_state = history[2]

# 3. 从该状态分支执行
branch_config = {
    "configurable": {
        "thread_id": "branch_session",
        "checkpoint_id": target_state.config["configurable"]["checkpoint_id"]
    }
}

# 4. 从这个历史节点继续执行(创造新的分支)
result = app.invoke(
    {"messages": [("human", "在这个时间点做不同的选择")]},
    branch_config
)
时间线:
         checkpoint_1 --> checkpoint_2 --> checkpoint_3 (当前)
                                              |
                                              +--> 分支_1 (不同选择)

通过回溯到 checkpoint_2 可以创建新分支:
         checkpoint_1 --> checkpoint_2 --> 分支_2 (另一条路径)

完整示例:带持久化的客服系统

from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# 1. 定义工具
@tool
def check_order_status(order_id: str) -> str:
    """查询订单状态"""
    return f"订单 {order_id} 状态: 已发货, 预计明天到达"

@tool
def get_return_policy() -> str:
    """获取退货政策"""
    return "30天内无理由退货,请保留原包装"

tools = [check_order_status, get_return_policy]

# 2. 创建带持久化的 ReAct Agent
llm = ChatOpenAI(model="gpt-4")
checkpointer = SqliteSaver.from_conn_string("customer_service.db")

agent = create_react_agent(
    llm=llm,
    tools=tools,
    state_modifier="你是一个客服助手,可以查询订单和退货政策。",
    checkpointer=checkpointer
)

# 3. 模拟多个用户的对话
def customer_conversation(user_id: str, messages: list):
    """模拟一个用户的对话流程"""
    config = {"configurable": {"thread_id": user_id}}
    for msg in messages:
        result = agent.invoke({"messages": [("human", msg)]}, config)
        print(f"[{user_id}] 用户: {msg}")
        print(f"[{user_id]} 客服: {result['messages'][-1].content}\n")

# 用户 A 的对话
customer_conversation("alice", [
    "我的订单 12345 到哪里了?",
    "如果我不满意可以退货吗?"
])

# 用户 B 的对话
customer_conversation("bob", [
    "你们的退货政策是什么?"
])

# 4. 查看所有历史会话
print("=== 会话历史 ===")
for config in [
    {"configurable": {"thread_id": "alice"}},
    {"configurable": {"thread_id": "bob"}}
]:
    history = list(agent.get_state_history(config))
    print(f"用户 {config['configurable']['thread_id']}: {len(history)} 次状态变更")
    for i, state in enumerate(history):
        last_msg = state.values["messages"][-1] if state.values["messages"] else ""
        print(f"  步骤 {i}: {str(last_msg.content)[:50]}...")

Checkpointer 选择指南

选择场景优势劣势
MemorySaver开发、测试、演示无需配置、零依赖重启丢失、内存占用
SqliteSaver单机生产、小型应用持久化、文件可迁移并发性能有限
AsyncSqliteSaver异步应用非阻塞 IO需要 async 运行时
PostgresSaver分布式、高并发强一致性、高可用需要部署 PostgreSQL
MongoDBSaver文档数据场景灵活的模式需要部署 MongoDB

最佳实践

  1. 选择合适的 Checkpointer:开发用 MemorySaver,生产用 SqliteSaver 或 PostgresSaver
  2. 合理使用 thread_id:每个独立对话使用不同的 thread_id
  3. 定期清理历史:长时间运行的应用需要清理旧的历史状态
  4. 敏感数据脱敏:存储在 checkpoint 中的数据应进行脱敏处理
  5. 监控大小:定期检查 checkpoint 存储大小,避免无限增长

持久化是 LangGraph 应用从原型走向生产的关键能力。通过 Checkpointer,我们可以实现会话的持久保存、历史回溯和时间旅行调试,为用户提供更加连贯和可靠的体验。