LangGraph 第11章:Human-in-the-Loop 人机协作

在某些关键场景中,我们希望人类能够参与到 Agent 的执行过程中,进行审批、确认或提供额外信息。LangGraph 提供了 interrupt 机制来实现人机协作。

Interrupt 机制

interrupt 函数可以在节点执行过程中暂停流程,等待人工输入:

from langgraph.types import interrupt
from typing import TypedDict

class ApprovalState(TypedDict):
    request: str
    approved: bool
    feedback: str

def human_approval_node(state: ApprovalState):
    """暂停执行,等待人工审批"""
    # interrupt 会暂停执行并返回需要人工处理的信息
    result = interrupt({
        "question": "请审批以下请求:",
        "request": state["request"],
        "options": ["approve", "reject"]
    })

    # 人工输入后继续执行
    if result.get("action") == "approve":
        return {"approved": True, "feedback": result.get("comment", "")}
    else:
        return {"approved": False, "feedback": result.get("comment", "")}

interrupt 的工作原理

# interrupt() 调用时会:
# 1. 暂停图的执行
# 2. 保存当前状态到 checkpoint
# 3. 返回中断信息给调用方
# 4. 等待 resume() 调用
# 5. 恢复执行并返回人工输入的数据

审批工作流

执行审批流程

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

# 1. 定义状态
class ApprovalWorkflow(TypedDict):
    request_id: str
    requester: str
    amount: float
    reason: str
    status: str  # pending / approved / rejected
    reviewer_comment: str

# 2. 定义节点
def validate_request(state: ApprovalWorkflow):
    """自动验证请求"""
    if state["amount"] <= 0:
        return {"status": "rejected", "reviewer_comment": "金额无效"}
    if state["amount"] <= 1000:
        # 小额自动通过
        return {"status": "approved", "reviewer_comment": "小额自动审批通过"}
    # 大额需要人工审批
    return {"status": "pending"}

def human_review(state: ApprovalWorkflow):
    """人工审批节点"""
    review_result = interrupt({
        "type": "approval",
        "request_id": state["request_id"],
        "requester": state["requester"],
        "amount": state["amount"],
        "reason": state["reason"]
    })

    if review_result.get("approved"):
        return {
            "status": "approved",
            "reviewer_comment": review_result.get("comment", "审批通过")
        }
    else:
        return {
            "status": "rejected",
            "reviewer_comment": review_result.get("comment", "未说明原因")
        }

def notify_result(state: ApprovalWorkflow):
    """通知审批结果"""
    print(f"请求 {state['request_id']}: {state['status']}")
    print(f"审批意见: {state['reviewer_comment']}")
    return {}

# 3. 路由函数
def approval_router(state: ApprovalWorkflow):
    """根据验证结果路由"""
    if state["status"] == "pending":
        return "human_review"
    else:
        return "notify"

# 4. 构建图
builder = StateGraph(ApprovalWorkflow)
builder.add_node("validate", validate_request)
builder.add_node("human_review", human_review)
builder.add_node("notify", notify_result)

builder.add_edge(START, "validate")
builder.add_conditional_edges("validate", approval_router)
builder.add_edge("human_review", "notify")
builder.add_edge("notify", END)

# 5. 使用 MemorySaver 支持中断
app = builder.compile(checkpointer=MemorySaver())

执行与恢复

import uuid

# 开始一个审批流程
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# 第一次执行(会在 human_review 处暂停)
initial_state = {
    "request_id": "REQ-001",
    "requester": "张三",
    "amount": 5000,
    "reason": "购买开发工具",
    "status": "",
    "reviewer_comment": ""
}

try:
    result = app.invoke(initial_state, config)
except Exception:
    pass  # interrupt 会引发异常暂停

# 检查当前状态
current_state = app.get_state(config)
print(f"当前状态: {current_state.values['status']}")
# 输出: pending

# 查看中断信息
for task in current_state.tasks:
    if task.interrupts:
        for interrupt_info in task.interrupts:
            print(f"等待审批: {interrupt_info.value}")
            # 输出: {'type': 'approval', 'request_id': 'REQ-001', ...}

# 人工审批后恢复执行
result = app.invoke(
    Command(resume={"approved": True, "comment": "同意购买"}),
    config
)

print(f"最终状态: {result['status']}")
# 输出: approved

断点(Breakpoint)

除了在代码中使用 interrupt,还可以在编译时设置断点:

# 方式一:编译时设置断点
app = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["human_review"],  # 在进入 human_review 前暂停
    # interrupt_after=["validate"]      # 或在 validate 执行后暂停
)

# 执行会在进入 human_review 前自动暂停
result = app.invoke(initial_state, config)

# 恢复执行
result = app.invoke(
    Command(resume={"approved": True}),
    config
)

断点类型对比

类型设置方式行为
代码断点interrupt() 函数在节点内部暂停,可携带自定义信息
编译断点interrupt_before进入指定节点前暂停
编译断点interrupt_after指定节点执行后暂停

恢复执行

恢复执行的几种方式:

# 1. 使用 Command.resume() 提供输入
app.invoke(Command(resume=user_input), config)

# 2. 使用 None 跳过(拒绝操作)
app.invoke(Command(resume=None), config)

# 3. 使用 Command.goto() 跳转到指定节点
app.invoke(Command(goto="another_node"), config)

# 4. 同时提供恢复数据和流程控制
app.invoke(
    Command(resume={"action": "approve"}, goto="notify"),
    config
)

完整示例:内容审核系统

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
from typing import TypedDict

class ContentReview(TypedDict):
    content: str
    author: str
    auto_score: float      # 自动审核分数
    flagged: bool          # 是否需要人工审核
    status: str            # pending / approved / rejected
    reviewer_feedback: str

def auto_review(state: ContentReview):
    """自动审核:检查内容质量"""
    content = state["content"]

    # 简单规则检查
    issues = 0
    if len(content) < 10:
        issues += 1
    if "http://" in content or "https://" in content:
        issues += 1

    # 计算分数
    score = max(0, 10 - issues * 3)
    flagged = issues > 0

    return {
        "auto_score": score,
        "flagged": flagged,
        "status": "pending" if flagged else "approved"
    }

def human_review_node(state: ContentReview):
    """人工审核节点"""
    review_data = interrupt({
        "content": state["content"],
        "author": state["author"],
        "auto_score": state["auto_score"],
        "flagged_reasons": ["内容过短" if len(state["content"]) < 10 else "",
                           "包含链接" if "http" in state["content"] else ""]
    })

    decision = review_data.get("decision", "reject")
    feedback = review_data.get("feedback", "")

    if decision == "approve":
        return {"status": "approved", "reviewer_feedback": feedback}
    else:
        return {"status": "rejected", "reviewer_feedback": feedback}

def publish_content(state: ContentReview):
    """发布内容"""
    print(f"内容已发布: {state['content'][:50]}...")
    return {}

def reject_content(state: ContentReview):
    """拒绝内容"""
    print(f"内容被拒绝: {state['reviewer_feedback']}")
    return {}

def review_router(state: ContentReview):
    """审核结果路由"""
    if state["status"] == "approved":
        return "publish"
    else:
        return "reject"

# 构建图
builder = StateGraph(ContentReview)
builder.add_node("auto_review", auto_review)
builder.add_node("human_review", human_review_node)
builder.add_node("publish", publish_content)
builder.add_node("reject", reject_content)

builder.add_edge(START, "auto_review")
builder.add_conditional_edges(
    "auto_review",
    lambda s: "human_review" if s["flagged"] else "publish"
)
builder.add_edge("human_review", "publish")
builder.add_edge("publish", END)
builder.add_edge("reject", END)

# 编译,在 human_review 前设置断点
app = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["human_review"]
)

最佳实践

  1. 明确告知用户:中断信息应该清晰说明需要人类做什么
  2. 提供选项:给用户明确的选择,而不是开放式的输入
  3. 设置超时:长时间未恢复执行应有超时处理机制
  4. 记录审计日志:所有人工操作都应该被记录
  5. 合理使用断点:只在确实需要人类判断的地方设置断点

Human-in-the-Loop 让 LangGraph 应用既保留了 AI 的高效,又确保了关键节点的安全可控。下一章将介绍持久化内存机制,实现会话的保存与恢复。