LangGraph 第9章:构建对话机器人

对话机器人是 LangGraph 最常见的应用场景之一。本章介绍如何构建一个支持多轮对话、上下文管理和流式输出的完整对话机器人。

基本对话机器人

使用 MessagesState

LangGraph 提供了内置的 MessagesState,专门用于对话场景:

from typing import TypedDict
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

# MessagesState 自动包含:
# messages: Annotated[List[BaseMessage], add_messages]

class ChatbotState(MessagesState):
    """可以扩展额外的对话字段"""
    session_id: str
    user_name: str
    metadata: dict

# 初始化 LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

def chat_node(state: ChatbotState):
    """对话节点:接收消息并生成回复"""
    # state["messages"] 自动包含所有历史消息
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
builder = StateGraph(ChatbotState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

app = builder.compile()

执行对话

# 第一轮对话
result = app.invoke({
    "messages": [HumanMessage(content="你好!")],
    "session_id": "session_001",
    "user_name": "Alice",
    "metadata": {}
})

print(result["messages"][-1].content)
# "你好!Alice,有什么我可以帮助你的?"

# 第二轮对话(自动使用历史消息)
result = app.invoke({
    "messages": [HumanMessage(content="我刚才说了什么?")],
    "session_id": "session_001",
    "user_name": "Alice",
    "metadata": {}
})
# LLM 会看到包含第一轮对话的完整消息列表

多轮对话实现

要实现真正的多轮对话,需要将历史消息持久化并传入:

from langgraph.checkpoint.memory import MemorySaver

# 使用内存检查点保存对话历史
memory = MemorySaver()
app = builder.compile(checkpointer=memory)

# 通过 thread_id 标识会话
config = {"configurable": {"thread_id": "user_alice"}}

# 第一轮
result = app.invoke(
    {"messages": [HumanMessage(content="我的名字是 Alice")]},
    config
)

# 第二轮 - 自动恢复历史
result = app.invoke(
    {"messages": [HumanMessage(content="我叫什么名字?")]},
    config
)
print(result["messages"][-1].content)
# "你的名字是 Alice!"

系统提示词管理

在对话中融入系统提示词,控制 AI 的行为:

from langchain_core.messages import SystemMessage

# 方式一:在节点中注入系统消息
def chat_with_system(state: ChatbotState):
    system_msg = SystemMessage(content="你是一个友好的客服助手。请用中文回复,保持礼貌和耐心。")
    # 在每个对话轮次前插入系统消息
    messages = [system_msg] + state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

# 方式二:在状态中存储系统提示词
class ConfigurableChatState(MessagesState):
    system_prompt: str

def configurable_chat(state: ConfigurableChatState):
    system_msg = SystemMessage(content=state.get("system_prompt", "你是默认助手"))
    messages = [system_msg] + state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

上下文窗口管理

LLM 有上下文窗口限制,需要管理消息列表的长度:

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, trim_messages

def chat_with_window(state: ChatbotState):
    """带上下文窗口管理的对话节点"""
    max_tokens = 4000  # 保留最近的 4000 token

    # 修剪消息列表到指定长度
    trimmed_messages = trim_messages(
        state["messages"],
        token_counter=llm.get_num_tokens,
        max_tokens=max_tokens,
        strategy="last",       # 保留最新的消息
        start_on="human",      # 以人类消息开头
        include_system=True,   # 保留系统消息
    )

    response = llm.invoke(trimmed_messages)
    return {"messages": [response]}

会话状态持久化

使用 MemorySaver(内存级别)

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = builder.compile(checkpointer=memory)

# 每个会话通过 thread_id 隔离
config_alice = {"configurable": {"thread_id": "alice"}}
config_bob = {"configurable": {"thread_id": "bob"}}

# Alice 和 Bob 的对话互不干扰
app.invoke({"messages": [HumanMessage(content="你好")]}, config_alice)
app.invoke({"messages": [HumanMessage(content="嗨")]}, config_bob)

查看历史状态

# 获取指定会话的所有历史状态
history = list(app.get_state_history(config_alice))
for state in history:
    print(state.values["messages"])

流式输出

实时显示 LLM 的生成内容:

# 方式一:流式输出消息
for event in app.stream(
    {"messages": [HumanMessage(content="写一首诗")]},
    config
):
    for node_name, output in event.items():
        if "messages" in output:
            print(output["messages"][-1].content, end="", flush=True)

# 方式二:流式输出 token
async for event in app.astream_events(
    {"messages": [HumanMessage(content="写一首诗")]},
    config,
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)

完整示例:客服机器人

from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.tools import tool

# 定义工具
@tool
def search_knowledge_base(query: str) -> str:
    """搜索知识库"""
    return f"关于'{query}'的知识库结果..."

@tool
def create_ticket(issue: str) -> str:
    """创建工单"""
    return f"工单已创建: {issue}"

# 初始化带工具的 LLM
llm = ChatOpenAI(model="gpt-4")
tools = [search_knowledge_base, create_ticket]
llm_with_tools = llm.bind_tools(tools)

class SupportState(MessagesState):
    ticket_id: str

def support_node(state: SupportState):
    """客服对话节点"""
    system_prompt = SystemMessage(content="你是客服助手。先用知识库回答问题,解决不了再创建工单。")
    messages = [system_prompt] + state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

# 构建
builder = StateGraph(SupportState)
builder.add_node("support", support_node)
builder.add_edge(START, "support")
builder.add_edge("support", END)

app = builder.compile(checkpointer=MemorySaver())

# 测试
config = {"configurable": {"thread_id": "customer_1"}}
response = app.invoke(
    {"messages": [HumanMessage(content="我无法登录账号")]},
    config
)
print(response["messages"][-1].content)

对话机器人架构对比

架构复杂度能力适用场景
单节点 + Memory多轮对话、基本上下文客服问答
节点 + 工具调用知识检索、工单创建智能客服
多节点 + 条件路由意图分类、多流程复杂对话系统
Agent 循环自主推理、多工具调用通用 AI 助手

本章介绍了对话机器人的基础实现。下一章将深入 ReAct Agent 模式,实现更强大的工具调用能力。