LangGraph 第14章:多 Agent 系统
为什么需要多 Agent 系统
在实际应用中,一个 Agent 往往难以胜任所有任务。比如构建一个文章写作助手:研究资料需要搜索能力,撰写文章需要写作能力,质量检查需要评审能力。如果用一个 Agent 包揽所有工作,提示词会变得极其臃肿,维护困难,且单个 LLM 调用很难在所有维度上都表现优秀。
多 Agent 系统的核心思路是 "分而治之":将复杂任务拆解为多个子任务,每个子任务由专门的 Agent 负责,通过协作机制将它们组合起来。LangGraph 提供了原生的多 Agent 支持,核心模式包括 Supervisor(主管调度)和 Subgraph(子图封装)。
Supervisor 模式:主管调度架构
Supervisor 模式是最常用的多 Agent 架构。它的工作方式如下:
- Supervisor Agent:一个"主管"LLM,负责分析任务、决定下一步调用哪个专家 Agent
- Worker Agents:多个"专家"Agent,各自擅长特定领域
- 循环执行:Supervisor 不断调度 Worker,直到任务完成
架构示意图
+-----------+
| Supervisor |
+-----+-----+
|
+-------------+-------------+
| | |
+------v------+ +---v----+ +------v------+
| Research | | Writer | | Reviewer |
| Agent | | Agent | | Agent |
+------+------+ +--------+ +------+------+
| |
+---------->终<------------+
判
代码实现
import operator
from typing import Annotated, TypedDict, List, Literal
from langgraph.graph import StateGraph, MessagesState
from langgraph.types import Command
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
# 定义状态
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
# 定义专家 Agent 节点
def research_agent(state: AgentState):
"""研究资料 Agent:负责收集和整理信息"""
# 实际项目中这里会调用搜索工具
return {
"messages": [("system", "研究完成,已收集相关资料。")],
"next_agent": "supervisor"
}
def writing_agent(state: AgentState):
"""写作 Agent:负责撰写文章"""
return {
"messages": [("system", "文章初稿完成。")],
"next_agent": "supervisor"
}
def review_agent(state: AgentState):
"""评审 Agent:负责质量检查"""
return {
"messages": [("system", "评审完成,文章质量合格。")],
"next_agent": "supervisor"
}
# Supervisor 节点:决定下一步调用哪个 Agent
def supervisor_node(state: AgentState):
llm = ChatOpenAI(model="gpt-4")
# 构建提示词,让 LLM 选择下一步
prompt = f"""你是主管 Agent,负责协调以下专家完成任务:
1. research_agent:负责搜索和收集资料
2. writing_agent:负责撰写文章内容
3. review_agent:负责评审和修改
当前状态:{state["messages"]}
请分析当前进度,选择下一步应该调用哪个 Agent。
如果任务已经完成,输出 "FINISH"。
"""
response = llm.invoke(prompt)
content = response.content.strip()
if "FINISH" in content:
return Command(goto="__end__")
elif "research" in content:
return Command(goto="research_agent")
elif "writing" in content:
return Command(goto="writing_agent")
elif "review" in content:
return Command(goto="review_agent")
else:
return Command(goto="__end__")
# 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("research_agent", research_agent)
workflow.add_node("writing_agent", writing_agent)
workflow.add_node("review_agent", review_agent)
# 设置入口点和边
workflow.set_entry_point("supervisor")
workflow.add_edge("research_agent", "supervisor")
workflow.add_edge("writing_agent", "supervisor")
workflow.add_edge("review_agent", "supervisor")
# 编译
app = workflow.compile(checkpointer=MemorySaver())
Subgraph:子图封装与复用
Subgraph 是将子工作流封装为可复用的组件。当某个 Agent 内部也需要复杂的工作流时,Subgraph 就派上用场了。
子图定义
from langgraph.graph import StateGraph
# 定义一个子图:资料检索 + 摘要
class ResearchSubState(TypedDict):
query: str
results: list
summary: str
def search_node(state: ResearchSubState):
# 模拟搜索
return {"results": ["结果1", "结果2", "结果3"]}
def summarize_node(state: ResearchSubState):
# 生成摘要
return {"summary": f"基于{len(state['results'])}条结果的摘要"}
# 构建子图
research_subgraph = StateGraph(ResearchSubState)
research_subgraph.add_node("search", search_node)
research_subgraph.add_node("summarize", summarize_node)
research_subgraph.set_entry_point("search")
research_subgraph.add_edge("search", "summarize")
research_subgraph.add_edge("summarize", "__end__")
# 编译子图
compiled_subgraph = research_subgraph.compile()
将子图接入主图
# 在主图中将子图作为一个节点添加
main_graph = StateGraph(AgentState)
main_graph.add_node("research", compiled_subgraph)
main_graph.add_node("writing", writing_agent)
main_graph.add_edge("research", "writing")
main_graph.add_edge("writing", "__end__")
main_graph.set_entry_point("research")
使用 Subgraph 的好处:
- 封装性:子图内部的状态和逻辑对外不可见,降低了复杂度
- 复用性:同一个子图可以在多个主图中重复使用
- 可测试性:每个子图可以独立测试和调试
- 清晰的结构:主图只关心子图的输入输出,不关心内部实现
Agent 间通信模式
多 Agent 系统中,通信方式直接影响系统的效率和可靠性。
消息传递模式
通过 state 中的 messages 列表传递信息。每个 Agent 读取前序 Agent 的输出,追加自己的结果:
# Agent 读取前序信息
previous_work = [m for m in state["messages"] if m.type == "system"]
latest = previous_work[-1] if previous_work else None
共享状态模式
所有 Agent 读写同一个 State 对象,适用于需要频繁共享中间结果的场景:
class SharedState(TypedDict):
input_text: str # 原始输入
research_data: dict # 研究资料
draft: str # 草稿
review_comments: List[str] # 评审意见
final_output: str # 最终输出
iteration_count: int # 迭代次数(防止死循环)
Command 指令模式
Supervisor 通过 Command(goto=...) 指令控制流程走向,这是 LangGraph 中 Agent 间通信的标准方式。
完整示例:文章写作多 Agent 协作
from typing import TypedDict, List, Annotated
import operator
from langgraph.graph import StateGraph
from langgraph.types import Command
from langchain_openai import ChatOpenAI
# 1. 定义状态
class WritingState(TypedDict):
topic: str
research: str
outline: str
draft: str
review: str
final: str
iteration: int
messages: Annotated[List[str], operator.add]
# 2. 定义 Agent 节点
def researcher(state: WritingState):
"""研究 Agent"""
topic = state["topic"]
# 实际项目中会调用搜索 API 或工具
research = f"关于{topic}的研究资料:收集了最新进展..."
return {"research": research, "messages": [f"研究完成:{topic}"]}
def outliner(state: WritingState):
"""大纲 Agent"""
research = state["research"]
outline = f"""
大纲:
1. 引言 - 背景介绍
2. 主体 - 核心内容展开
3. 案例分析
4. 总结与展望
"""
return {"outline": outline, "messages": ["大纲完成"]}
def writer(state: WritingState):
"""写作 Agent"""
outline = state["outline"]
research = state["research"]
draft = f"基于大纲和研究的全文撰写..."
return {"draft": draft, "messages": ["初稿完成"]}
def reviewer(state: WritingState):
"""评审 Agent"""
draft = state["draft"]
review = "评审意见:1. 结构可以更紧凑 2. 需要更多例证"
return {"review": review, "messages": ["评审完成"]}
def finalizer(state: WritingState):
"""终审 Agent - 决定是否需要迭代"""
iteration = state.get("iteration", 0)
review = state.get("review", "")
if iteration < 2 and "需要修改" in review:
# 需要改进:回到写作节点
return Command(
goto="writer",
update={"iteration": iteration + 1, "messages": ["进入下一轮迭代"]}
)
else:
return {
"final": "最终定稿版本...",
"messages": ["任务完成"]
}
# 3. 构建图
builder = StateGraph(WritingState)
builder.add_node("researcher", researcher)
builder.add_node("outliner", outliner)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_node("finalizer", finalizer)
# 4. 连接
builder.set_entry_point("researcher")
builder.add_edge("researcher", "outliner")
builder.add_edge("outliner", "writer")
builder.add_edge("writer", "reviewer")
builder.add_edge("reviewer", "finalizer")
# finalizer 根据条件决定下一步
# 5. 编译运行
graph = builder.compile()
result = graph.invoke({
"topic": "AI Agent 的发展趋势",
"iteration": 0
})
执行流程可视化
以上示例的执行流程如下:
用户输入主题
|
v
[Researcher] --> [Outliner] --> [Writer] --> [Reviewer] --> [Finalizer]
| | | | |
| | | | +-----+-----+
| | | | | |
| | | | 需要修改 任务完成
| | | | | |
| | +<------------+--------+ |
| | | (迭代重写) |
| | | v
| | | [END]
多 Agent 设计原则
- 职责单一:每个 Agent 只做一件事,做到极致。不要把搜索和写作放在同一个 Agent 中。
- 输入输出明确:每个 Agent 的输入和输出应该有清晰的接口定义,便于独立测试。
- 防死循环:始终设置迭代次数上限(如
iteration_count),避免 Agent 陷入无限循环。 - 错误隔离:一个 Agent 的失败不应该导致整个系统崩溃。使用 try-except 包裹每个 Agent 的执行逻辑。
- 日志完善:每个 Agent 执行时记录关键信息,便于问题排查。
多 Agent 系统是 LangGraph 的核心优势之一。通过合理地拆分职责、设计通信机制,可以构建出强大而稳定的 AI 工作流系统。下一章我们将介绍 LangGraph Studio 的可视化调试工具,帮助你更直观地理解和调试这些复杂的工作流。