LangGraph 第12章:持久化内存
持久化是构建生产级 LLM 应用的基础能力。LangGraph 通过 Checkpointer 机制实现状态的保存、恢复和历史回溯。
Checkpointer 接口
Checkpointer 是 LangGraph 的状态持久化抽象,核心接口如下:
from langgraph.checkpoint.base import BaseCheckpointSaver
# Checkpointer 的核心功能:
# 1. put(config, checkpoint, metadata): 保存状态快照
# 2. get(config): 获取指定配置的最新状态
# 3. list(config): 列出所有历史状态
# 4. put_writes(config, writes): 保存中间写入
LangGraph 提供了多种 Checkpointer 实现:
| 实现 | 存储位置 | 适用场景 |
|---|---|---|
MemorySaver | 内存 | 开发测试、演示 |
SqliteSaver | SQLite 文件 | 单机生产环境 |
AsyncSqliteSaver | SQLite(异步) | 异步应用 |
PostgresSaver | PostgreSQL | 分布式生产环境 |
MongoDBSaver | MongoDB | 文档数据库场景 |
MemorySaver:内存级检查点
MemorySaver 将状态保存在内存中,应用重启后丢失。适合开发和测试:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
# 创建带持久化的应用
memory = MemorySaver()
app = builder.compile(checkpointer=memory)
# 通过 thread_id 隔离不同会话
config_1 = {"configurable": {"thread_id": "session_1"}}
config_2 = {"configurable": {"thread_id": "session_2"}}
# 两个会话互相独立
app.invoke({"messages": [("human", "你好")]}, config_1)
app.invoke({"messages": [("human", "Hello")]}, config_2)
SQLite 持久化
SqliteSaver(同步)
from langgraph.checkpoint.sqlite import SqliteSaver
# 使用内存 SQLite(临时存储)
checkpointer = SqliteSaver.from_conn_string(":memory:")
# 使用文件 SQLite(持久存储)
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# 在编译时传入
app = builder.compile(checkpointer=checkpointer)
AsyncSqliteSaver(异步)
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
import asyncio
# 异步 SQLite 持久化
async def main():
checkpointer = AsyncSqliteSaver.from_conn_string("async_checkpoints.db")
app = builder.compile(checkpointer=checkpointer)
result = await app.ainvoke(
{"messages": [("human", "你好")]},
{"configurable": {"thread_id": "async_session"}}
)
asyncio.run(main())
查看历史会话
Checkpointer 提供了完整的会话历史查询能力:
# 获取当前状态
current = app.get_state(config)
print(f"当前状态: {current.values}")
# 获取所有历史状态
history = list(app.get_state_history(config))
print(f"共有 {len(history)} 个历史状态")
# 遍历历史(从新到旧)
for i, state in enumerate(history):
print(f"[{i}] 状态: {state.values}")
print(f" 下一个节点: {state.next}")
# 恢复到指定状态
parent_config = history[-1].config # 恢复到初始状态
app.invoke(
{"messages": [("human", "从历史状态继续")]},
parent_config
)
跨会话的状态管理
# 列出所有会话
all_configs = []
for state in app.get_state_history(config):
if state.config["configurable"]["thread_id"] not in all_configs:
all_configs.append(state.config["configurable"]["thread_id"])
print(f"共有 {len(all_configs)} 个不同的会话")
会话恢复
恢复已有会话
# 假设 thread_id 为 "user_123" 的会话已存在
config = {"configurable": {"thread_id": "user_123"}}
# 检查会话是否存在
try:
state = app.get_state(config)
print(f"会话存在,当前状态: {state.values}")
except ValueError:
print("会话不存在")
state = None
# 继续已有会话
if state:
result = app.invoke(
{"messages": [("human", "继续之前的对话")]},
config
)
分支执行(时间旅行)
Checkpointer 支持从任意历史节点分支执行:
# 1. 获取历史状态列表
history = list(app.get_state_history(config))
# 2. 选择想回溯的状态(比如倒数第3个状态)
target_state = history[2]
# 3. 从该状态分支执行
branch_config = {
"configurable": {
"thread_id": "branch_session",
"checkpoint_id": target_state.config["configurable"]["checkpoint_id"]
}
}
# 4. 从这个历史节点继续执行(创造新的分支)
result = app.invoke(
{"messages": [("human", "在这个时间点做不同的选择")]},
branch_config
)
时间线:
checkpoint_1 --> checkpoint_2 --> checkpoint_3 (当前)
|
+--> 分支_1 (不同选择)
通过回溯到 checkpoint_2 可以创建新分支:
checkpoint_1 --> checkpoint_2 --> 分支_2 (另一条路径)
完整示例:带持久化的客服系统
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# 1. 定义工具
@tool
def check_order_status(order_id: str) -> str:
"""查询订单状态"""
return f"订单 {order_id} 状态: 已发货, 预计明天到达"
@tool
def get_return_policy() -> str:
"""获取退货政策"""
return "30天内无理由退货,请保留原包装"
tools = [check_order_status, get_return_policy]
# 2. 创建带持久化的 ReAct Agent
llm = ChatOpenAI(model="gpt-4")
checkpointer = SqliteSaver.from_conn_string("customer_service.db")
agent = create_react_agent(
llm=llm,
tools=tools,
state_modifier="你是一个客服助手,可以查询订单和退货政策。",
checkpointer=checkpointer
)
# 3. 模拟多个用户的对话
def customer_conversation(user_id: str, messages: list):
"""模拟一个用户的对话流程"""
config = {"configurable": {"thread_id": user_id}}
for msg in messages:
result = agent.invoke({"messages": [("human", msg)]}, config)
print(f"[{user_id}] 用户: {msg}")
print(f"[{user_id]} 客服: {result['messages'][-1].content}\n")
# 用户 A 的对话
customer_conversation("alice", [
"我的订单 12345 到哪里了?",
"如果我不满意可以退货吗?"
])
# 用户 B 的对话
customer_conversation("bob", [
"你们的退货政策是什么?"
])
# 4. 查看所有历史会话
print("=== 会话历史 ===")
for config in [
{"configurable": {"thread_id": "alice"}},
{"configurable": {"thread_id": "bob"}}
]:
history = list(agent.get_state_history(config))
print(f"用户 {config['configurable']['thread_id']}: {len(history)} 次状态变更")
for i, state in enumerate(history):
last_msg = state.values["messages"][-1] if state.values["messages"] else ""
print(f" 步骤 {i}: {str(last_msg.content)[:50]}...")
Checkpointer 选择指南
| 选择 | 场景 | 优势 | 劣势 |
|---|---|---|---|
| MemorySaver | 开发、测试、演示 | 无需配置、零依赖 | 重启丢失、内存占用 |
| SqliteSaver | 单机生产、小型应用 | 持久化、文件可迁移 | 并发性能有限 |
| AsyncSqliteSaver | 异步应用 | 非阻塞 IO | 需要 async 运行时 |
| PostgresSaver | 分布式、高并发 | 强一致性、高可用 | 需要部署 PostgreSQL |
| MongoDBSaver | 文档数据场景 | 灵活的模式 | 需要部署 MongoDB |
最佳实践
- 选择合适的 Checkpointer:开发用 MemorySaver,生产用 SqliteSaver 或 PostgresSaver
- 合理使用 thread_id:每个独立对话使用不同的 thread_id
- 定期清理历史:长时间运行的应用需要清理旧的历史状态
- 敏感数据脱敏:存储在 checkpoint 中的数据应进行脱敏处理
- 监控大小:定期检查 checkpoint 存储大小,避免无限增长
持久化是 LangGraph 应用从原型走向生产的关键能力。通过 Checkpointer,我们可以实现会话的持久保存、历史回溯和时间旅行调试,为用户提供更加连贯和可靠的体验。