LangGraph 第11章:Human-in-the-Loop 人机协作
在某些关键场景中,我们希望人类能够参与到 Agent 的执行过程中,进行审批、确认或提供额外信息。LangGraph 提供了 interrupt 机制来实现人机协作。
Interrupt 机制
interrupt 函数可以在节点执行过程中暂停流程,等待人工输入:
from langgraph.types import interrupt
from typing import TypedDict
class ApprovalState(TypedDict):
request: str
approved: bool
feedback: str
def human_approval_node(state: ApprovalState):
"""暂停执行,等待人工审批"""
# interrupt 会暂停执行并返回需要人工处理的信息
result = interrupt({
"question": "请审批以下请求:",
"request": state["request"],
"options": ["approve", "reject"]
})
# 人工输入后继续执行
if result.get("action") == "approve":
return {"approved": True, "feedback": result.get("comment", "")}
else:
return {"approved": False, "feedback": result.get("comment", "")}
interrupt 的工作原理
# interrupt() 调用时会:
# 1. 暂停图的执行
# 2. 保存当前状态到 checkpoint
# 3. 返回中断信息给调用方
# 4. 等待 resume() 调用
# 5. 恢复执行并返回人工输入的数据
审批工作流
执行审批流程
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
# 1. 定义状态
class ApprovalWorkflow(TypedDict):
request_id: str
requester: str
amount: float
reason: str
status: str # pending / approved / rejected
reviewer_comment: str
# 2. 定义节点
def validate_request(state: ApprovalWorkflow):
"""自动验证请求"""
if state["amount"] <= 0:
return {"status": "rejected", "reviewer_comment": "金额无效"}
if state["amount"] <= 1000:
# 小额自动通过
return {"status": "approved", "reviewer_comment": "小额自动审批通过"}
# 大额需要人工审批
return {"status": "pending"}
def human_review(state: ApprovalWorkflow):
"""人工审批节点"""
review_result = interrupt({
"type": "approval",
"request_id": state["request_id"],
"requester": state["requester"],
"amount": state["amount"],
"reason": state["reason"]
})
if review_result.get("approved"):
return {
"status": "approved",
"reviewer_comment": review_result.get("comment", "审批通过")
}
else:
return {
"status": "rejected",
"reviewer_comment": review_result.get("comment", "未说明原因")
}
def notify_result(state: ApprovalWorkflow):
"""通知审批结果"""
print(f"请求 {state['request_id']}: {state['status']}")
print(f"审批意见: {state['reviewer_comment']}")
return {}
# 3. 路由函数
def approval_router(state: ApprovalWorkflow):
"""根据验证结果路由"""
if state["status"] == "pending":
return "human_review"
else:
return "notify"
# 4. 构建图
builder = StateGraph(ApprovalWorkflow)
builder.add_node("validate", validate_request)
builder.add_node("human_review", human_review)
builder.add_node("notify", notify_result)
builder.add_edge(START, "validate")
builder.add_conditional_edges("validate", approval_router)
builder.add_edge("human_review", "notify")
builder.add_edge("notify", END)
# 5. 使用 MemorySaver 支持中断
app = builder.compile(checkpointer=MemorySaver())
执行与恢复
import uuid
# 开始一个审批流程
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# 第一次执行(会在 human_review 处暂停)
initial_state = {
"request_id": "REQ-001",
"requester": "张三",
"amount": 5000,
"reason": "购买开发工具",
"status": "",
"reviewer_comment": ""
}
try:
result = app.invoke(initial_state, config)
except Exception:
pass # interrupt 会引发异常暂停
# 检查当前状态
current_state = app.get_state(config)
print(f"当前状态: {current_state.values['status']}")
# 输出: pending
# 查看中断信息
for task in current_state.tasks:
if task.interrupts:
for interrupt_info in task.interrupts:
print(f"等待审批: {interrupt_info.value}")
# 输出: {'type': 'approval', 'request_id': 'REQ-001', ...}
# 人工审批后恢复执行
result = app.invoke(
Command(resume={"approved": True, "comment": "同意购买"}),
config
)
print(f"最终状态: {result['status']}")
# 输出: approved
断点(Breakpoint)
除了在代码中使用 interrupt,还可以在编译时设置断点:
# 方式一:编译时设置断点
app = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["human_review"], # 在进入 human_review 前暂停
# interrupt_after=["validate"] # 或在 validate 执行后暂停
)
# 执行会在进入 human_review 前自动暂停
result = app.invoke(initial_state, config)
# 恢复执行
result = app.invoke(
Command(resume={"approved": True}),
config
)
断点类型对比
| 类型 | 设置方式 | 行为 |
|---|---|---|
| 代码断点 | interrupt() 函数 | 在节点内部暂停,可携带自定义信息 |
| 编译断点 | interrupt_before | 进入指定节点前暂停 |
| 编译断点 | interrupt_after | 指定节点执行后暂停 |
恢复执行
恢复执行的几种方式:
# 1. 使用 Command.resume() 提供输入
app.invoke(Command(resume=user_input), config)
# 2. 使用 None 跳过(拒绝操作)
app.invoke(Command(resume=None), config)
# 3. 使用 Command.goto() 跳转到指定节点
app.invoke(Command(goto="another_node"), config)
# 4. 同时提供恢复数据和流程控制
app.invoke(
Command(resume={"action": "approve"}, goto="notify"),
config
)
完整示例:内容审核系统
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
from typing import TypedDict
class ContentReview(TypedDict):
content: str
author: str
auto_score: float # 自动审核分数
flagged: bool # 是否需要人工审核
status: str # pending / approved / rejected
reviewer_feedback: str
def auto_review(state: ContentReview):
"""自动审核:检查内容质量"""
content = state["content"]
# 简单规则检查
issues = 0
if len(content) < 10:
issues += 1
if "http://" in content or "https://" in content:
issues += 1
# 计算分数
score = max(0, 10 - issues * 3)
flagged = issues > 0
return {
"auto_score": score,
"flagged": flagged,
"status": "pending" if flagged else "approved"
}
def human_review_node(state: ContentReview):
"""人工审核节点"""
review_data = interrupt({
"content": state["content"],
"author": state["author"],
"auto_score": state["auto_score"],
"flagged_reasons": ["内容过短" if len(state["content"]) < 10 else "",
"包含链接" if "http" in state["content"] else ""]
})
decision = review_data.get("decision", "reject")
feedback = review_data.get("feedback", "")
if decision == "approve":
return {"status": "approved", "reviewer_feedback": feedback}
else:
return {"status": "rejected", "reviewer_feedback": feedback}
def publish_content(state: ContentReview):
"""发布内容"""
print(f"内容已发布: {state['content'][:50]}...")
return {}
def reject_content(state: ContentReview):
"""拒绝内容"""
print(f"内容被拒绝: {state['reviewer_feedback']}")
return {}
def review_router(state: ContentReview):
"""审核结果路由"""
if state["status"] == "approved":
return "publish"
else:
return "reject"
# 构建图
builder = StateGraph(ContentReview)
builder.add_node("auto_review", auto_review)
builder.add_node("human_review", human_review_node)
builder.add_node("publish", publish_content)
builder.add_node("reject", reject_content)
builder.add_edge(START, "auto_review")
builder.add_conditional_edges(
"auto_review",
lambda s: "human_review" if s["flagged"] else "publish"
)
builder.add_edge("human_review", "publish")
builder.add_edge("publish", END)
builder.add_edge("reject", END)
# 编译,在 human_review 前设置断点
app = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["human_review"]
)
最佳实践
- 明确告知用户:中断信息应该清晰说明需要人类做什么
- 提供选项:给用户明确的选择,而不是开放式的输入
- 设置超时:长时间未恢复执行应有超时处理机制
- 记录审计日志:所有人工操作都应该被记录
- 合理使用断点:只在确实需要人类判断的地方设置断点
Human-in-the-Loop 让 LangGraph 应用既保留了 AI 的高效,又确保了关键节点的安全可控。下一章将介绍持久化内存机制,实现会话的保存与恢复。