【LangGraph新手村系列】(2)自定义状态与归约器:让 LangGraph 记住更多东西

简介: 从 MessagesState 扩展到 TypedDict 自定义状态,用 Annotated 声明字段归约策略。messages 挂载 add_messages 实现追加合并,其他字段默认覆盖。节点函数可读取自定义字段,让 Agent 记住用户偏好与业务元数据。

第二章 自定义状态与归约器:让 LangGraph 记住更多东西

"MessagesState 只关心消息列表,但真实业务里 Agent 需要记住用户的偏好、身份、进度。自定义 State 让图里的'公共黑板'变得更大、更智能。"

一、问题

第一章里我们用 MessagesState 作为图的状态模式,它内置了一个 messages 字段和追加归约器,能很好地解决"对话历史怎么传递"的问题。但真实场景里,Agent 需要记住的东西远不止消息列表:

  • 用户偏好:有人喜欢摄氏度,有人喜欢华氏度。每次调用都重复说明很冗余,最好让 Agent"记在脑子里"。
  • 会话元数据:当前处于哪个步骤、已经尝试过哪些工具、用户的身份标识。
  • 业务字段:电商场景里的购物车内容、客服场景里的工单编号。

MessagesState 只有一个 messages 字段,放不下这些额外信息。如果你强行把偏好塞进 HumanMessage 的文本里,模型可能会把它当成普通对话内容处理,既不准确也不优雅。

另一个隐蔽的问题是:节点函数返回增量更新时,LangGraph 怎么合并新旧状态?

第一章里 call_model 返回 {"messages": [response]},看起来只返回了一条消息。LangGraph 之所以能自动追加而不是覆盖,是因为 MessagesState 内部预置了 add_messages 这个归约器。但如果你自定义了字段,这个合并逻辑就需要你自己声明——否则新值会直接覆盖旧值,可能不是你想要的效果。

本章的解决方案是:自定义 TypedDict 作为状态模式,用 Annotated 显式声明每个字段的归约策略

二、解决方案

我们要做三件事:

  1. 扩展状态模式:不再使用内置的 MessagesState,而是自己定义一个 TypedDict,加上 messagestemperature_unit 两个字段。
  2. 声明归约策略messages 字段需要追加合并,所以给它挂上 add_messages 归约器;temperature_unit 是字符串,新值直接覆盖旧值即可,不需要特殊归约器。
  3. 节点函数读取自定义字段:在 call_model 里读取 temperature_unit,根据用户偏好动态修改发给模型的消息。

状态流转示意图如下:

+--------+      +------------+      +------------------+
| START  | ---> |   agent    | ---> | should_continue  |
|        |      | (模型调用)  |      |   (条件判断)      |
+--------+      +------+-----+      +---------+--------+
                       ^                      |
                       |                      |
                       |        +-------------+-------------+
                       |        | 返回 "tools" |  返回 END    |
                       |        |              |              |
                       |        v              v              v
                       |   +--------+    +--------+
                       |   | tools  |    |  END   |
                       |   |(工具)  |    +--------+
                       |   +---+----+
                       |       |
                       +-------+
                       (回到 agent)

图中的"公共黑板" State 现在变成:
{
    "messages": [msg1, msg2, msg3, ...],      ← 追加归约
    "temperature_unit": "摄氏度" | "华氏度"    ← 覆盖归约
}

执行流程和第一章一样,仍然是 agent -> tools -> agent 的循环,但 agent 节点现在会先看一眼黑板上的 temperature_unit,再决定要不要在消息开头插入一句"请把温度转成华氏度"。

三、工作原理

1. 从 MessagesState 到自定义 State

MessagesState 本质上是 LangGraph 预定义的一个 TypedDict

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

它只定义了一个字段 messages,类型是 list[AnyMessage],归约器是 add_messages。当你写 StateGraph(MessagesState) 时,LangGraph 会把这个结构作为图的"状态合同":每个节点接收的 state 参数、返回的字典键,都必须和这个合同匹配。

一旦业务需要扩展字段,你就必须自己重新定义这个合同。

2. TypedDict:自定义状态模式

from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages

class State(TypedDict):
    """
    自定义状态:消息历史 + 温度单位偏好
    """
    messages: Annotated[list[BaseMessage], add_messages]
    temperature_unit: str

TypedDict 是 Python 标准库里的一个特殊字典类型。和普通 dict 不同,它允许你给键标注固定的类型,IDE 能据此提供自动补全和类型检查。

State 在这里的作用是状态契约

  • 它规定了图中流转的数据必须包含 messagestemperature_unit 两个键;
  • 节点函数 call_model(state: State) 的参数类型提示,让 IDE 知道 state 里能取出哪些字段;
  • StateGraph(State) 在编译时校验:节点返回的字典键必须是 messagestemperature_unit,返回其他键会报错。

注意 temperature_unit: str 没有写默认值。这意味着当你第一次调用 app.invoke() 时,要么在输入里显式传 "temperature_unit": "摄氏度",要么在节点函数里用 state.get('temperature_unit', '摄氏度') 做兜底。否则它的值会是 None

3. Annotatedadd_messages:声明归约器

messages: Annotated[list[BaseMessage], add_messages]

这行代码看起来复杂,其实是"一纸合同的两个条款":

部分 作用 给谁看
list[BaseMessage] 类型声明:这是一个消息列表 Python 类型检查器、IDE
add_messages 归约函数:新列表追加到旧列表末尾 LangGraph 运行时

Annotated 是 Python typing 模块提供的元数据包装器。它的语法是 Annotated[类型, 元数据1, 元数据2, ...]。类型检查器只关心第一个参数(类型),后面的元数据在运行时才可能被框架读取。

LangGraph 的设计是:在编译 StateGraph 时,检查每个字段的 Annotated 元数据。如果第二个参数是一个 callable,就把它注册为该字段的归约器。

运行时发生了什么?假设当前 messages 已有两条消息:

# 旧值
state["messages"] = [HumanMessage(content="你好"), AIMessage(content="...")]

# 节点返回的新值(增量更新)
return {
   "messages": [AIMessage(content="上海30度")]}

如果没有归约器,LangGraph 会用默认策略:直接替换。最终 messages 只剩 [AIMessage(content="上海30度")],历史全丢。

因为注册了 add_messages,LangGraph 会执行:

new_messages = add_messages(old_messages, returned_messages)
# 结果:[HumanMessage, AIMessage, AIMessage]

add_messages 的内部逻辑并不复杂:它把旧列表和新列表拼接起来,同时做去重(防止同一条消息被重复插入)。你不需要自己手写合并逻辑。

4. 默认归约器:覆盖行为

temperature_unit: str

这个字段没有Annotated 包装,因此 LangGraph 对它使用默认归约策略:新值直接覆盖旧值。

这正是我们想要的:

  • 第一次传入 "摄氏度"temperature_unit 变成 "摄氏度"
  • 第二次不传这个字段 → 旧值保留(检查点机制负责恢复);
  • 第三次传入 "华氏度" → 旧值被覆盖为 "华氏度"

如果你想改变默认行为,也可以给字符串字段显式声明归约器。例如用 operator.add 实现字符串拼接:

from operator import add

class State(TypedDict):
    notes: Annotated[str, add]  # 新字符串拼接到旧字符串后面

但在大多数业务场景里,覆盖行为是更合理的选择。

5. 节点函数读取自定义字段

from langchain_core.messages import SystemMessage

def call_model(state: State):
    messages = state['messages']
    unit = state.get('temperature_unit', '摄氏度')

    modified_messages = list(messages)
    if unit == '华氏度':
        modified_messages.insert(0, SystemMessage(content="请将温度转换为华氏度"))

    response = model.invoke(modified_messages)
    return {
   "messages": [response]}

和第一章的 call_model 相比,这里多了两步:

  1. 读取自定义字段state.get('temperature_unit', '摄氏度') 从状态黑板上取出用户的温度单位偏好。如果之前没有设置过,默认用摄氏度。
  2. 动态构造输入:如果用户偏好华氏度,就在消息列表最前面插入一条 SystemMessage,告诉模型"请把温度转成华氏度"。SystemMessage 是对话中的系统指令角色,模型会把它当作全局约束来遵循,而不会把它当成普通对话内容。

modified_messages = list(messages) 这一步很重要:因为 messages 来自状态,LangGraph 内部可能持有它的引用。直接 messages.insert(0, ...)原地修改原列表,可能导致不可预期的副作用。先 list() 复制一份,再修改副本,是安全的做法。

节点函数的返回值仍然是 {"messages": [response]}。注意这里没有返回 temperature_unit——因为本轮调用没有改变这个字段,LangGraph 会保持它的旧值(从检查点恢复或上一轮遗留)。只有当你显式返回 {"temperature_unit": "华氏度"} 时,它才会被更新。

6. 构建图:StateGraph(State)

workflow = StateGraph(State)

和第一章的 StateGraph(MessagesState) 相比,唯一的变化是传入了我们自定义的 State 类。这告诉 LangGraph:

  • 图中所有节点接收的 state 参数,必须符合 State 的类型定义;
  • 节点返回的字典键,必须是 messagestemperature_unit
  • messages 字段使用 add_messages 归约,temperature_unit 使用默认覆盖归约。

其余代码——注册节点、添加边、设置检查点——和第一章完全一致。

7. 检查点与自定义字段的持久化

checkpointer = InMemorySaver()
app = workflow.compile(checkpointer=checkpointer)

InMemorySaver 的工作机制和第一章相同,但它保存的内容变多了。每个检查点现在不仅包含 messages 列表,还包含 temperature_unit 的值。

检查点内部的数据结构类似这样:

{
   
    1: [  # thread_id = 1
        {
   
            "messages": [HumanMessage(...), AIMessage(...), ...],
            "temperature_unit": "摄氏度"
        },  # checkpoint_0(第一轮结束)
        {
   
            "messages": [..., HumanMessage("北京呢?"), AIMessage(...)],
            "temperature_unit": "摄氏度"
        },  # checkpoint_1(第二轮结束)
        {
   
            "messages": [..., HumanMessage("上海天气如何?"), AIMessage(...)],
            "temperature_unit": "华氏度"
        },  # checkpoint_2(第三轮结束)
    ]
}

关键观察:

  • 第二轮只传了 {"messages": [HumanMessage("北京呢?")]},没有传 temperature_unit。但 InMemorySaver 在运行前已经把 checkpoint_0 里的 "摄氏度" 恢复到状态中了,所以 call_model 读到的仍然是 "摄氏度"
  • 第三轮显式传了 {"temperature_unit": "华氏度"}。这个新值会和检查点恢复的旧值做归约——因为 temperature_unit 没有特殊归约器,默认是"新覆盖旧",所以最终变成 "华氏度",并被存入 checkpoint_2

8. 三次调用实验

我们用同一个 thread_id=1 连续调用三次,观察自定义字段的行为:

第一次调用

final_state = app.invoke(
    {
   
        "messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
        "temperature_unit": "摄氏度"
    },
    config={
   "configurable": {
   "thread_id": 1}}
)
  • InMemorySaver 查询 thread_id=1 → 无记录 → 从零开始。
  • 初始状态:messages=[上海天气请求], temperature_unit=摄氏度
  • 运行:agent 调用模型,模型决定调用 search 工具 → tools 执行 → 回到 agent → 模型组织答案。
  • 最终:messages 里有 4 条消息(请求、工具调用、工具结果、最终回答),temperature_unit=摄氏度
  • InMemorySaver 存入 checkpoint_0

第二次调用

final_state = app.invoke(
    {
   "messages": [HumanMessage(content="北京呢?")]},
    config={
   "configurable": {
   "thread_id": 1}}
)
  • InMemorySaver 查询 thread_id=1 → 有 checkpoint_0 → 恢复状态。
  • 初始状态:messages=上一轮的4条消息 + 新的"北京呢?", temperature_unit=摄氏度
  • 注意:我们没有传 temperature_unit,但它从检查点恢复后是 "摄氏度",不会被清空。
  • 运行:模型看到历史里有"上海",现在又有"北京呢?",推断出要查北京天气 → 调用 search → 回答。
  • 最终:temperature_unit 仍然是 "摄氏度"

第三次调用

final_state = app.invoke(
    {
   
        "messages": [HumanMessage(content="上海天气如何?")],
        "temperature_unit": "华氏度"  # 新值覆盖旧值
    },
    config={
   "configurable": {
   "thread_id": 1}}
)
  • 恢复 checkpoint_1,初始 temperature_unit=摄氏度
  • 传入新值 "华氏度" → 默认归约器执行覆盖 → temperature_unit 变成 "华氏度"
  • call_model 读取到 unit=华氏度,在消息列表前插入一条详细指令,要求模型重新计算并标注°F。
  • 模型输出中会出现转换后的华氏度数值(如 30°C = 86°F)。
  • InMemorySaver 存入 checkpoint_2,其中 temperature_unit=华氏度

四、核心组件一览

组件 类 / 函数 / 常量 作用
自定义状态模式 TypedDict 声明图中流转的数据结构,可扩展任意业务字段
归约器声明 Annotated[T, reducer] 把类型和合并策略绑定在一起,告诉 LangGraph 怎么合并新旧值
消息追加归约器 add_messages 内置归约函数,将新消息列表追加到旧列表,自动去重
默认归约策略 (无) 新值直接覆盖旧值,适用于字符串、数值等非列表字段
系统消息 SystemMessage 对话中的指令角色,模型会将其作为全局约束遵循
状态读取 state.get(key, default) 安全地读取自定义字段,提供默认值防止 None
图构建器 StateGraph(State) 传入自定义 TypedDict,编译时校验节点返回的字段
检查点 InMemorySaver 保存完整 State(包括自定义字段),实现跨轮次恢复
消息复制 list(messages) 复制消息列表,避免原地修改状态引用

五、试一试

确保项目根目录的 .env 已配置好 API Key:

OPENAI_API_KEY=sk-...
# 如果使用第三方兼容服务(如 Moonshot)
OPENAI_BASE_URL=https://api.moonshot.cn/v1
MODEL_ID=kimi-k2.5

执行脚本:

cd demo-01
python langgraph-02.py

你应该能看到三段输出:

  1. 第一段:上海天气,温度单位是 °C(因为传入了 "摄氏度")。
  2. 第二段:北京天气,温度单位仍然是 °C(检查点恢复了旧值,我们没有传新偏好)。
  3. 第三段:上海天气,输出中出现 30°C = 86°F(因为传入 "华氏度" 覆盖了旧值,call_model 插入了强制转换指令)。

尝试修改实验来加深理解:

  1. 不传 temperature_unit 的第一次调用:去掉第一次调用里的 "temperature_unit" 字段,观察 call_model 里的 state.get('temperature_unit', '摄氏度') 兜底逻辑是否生效。
  2. 换一个 thread_id:第三次调用改成 thread_id=2,观察 temperature_unit 是否回到默认值——因为新 thread 没有检查点记录。
  3. 去掉 Annotated[..., add_messages]:把 messages 字段的定义改成 messages: list[BaseMessage],重新运行。你会发现消息历史每次都被覆盖,模型完全失忆。

完整代码

from dotenv import load_dotenv
load_dotenv(override=True)

import os
from typing import Literal, Annotated
from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.messages import BaseMessage, SystemMessage
from langgraph.graph import MessagesState, START, END, StateGraph
from typing import TypedDict


class State(TypedDict):
    """
    自定义状态:消息历史 + 温度单位偏好
    """
    messages: Annotated[list[BaseMessage], add_messages]
    temperature_unit: str


@tool
def search(query: str) -> str:
    """模拟一个搜索工具"""
    if "上海" in query.lower() or "Shanghai" in query.lower():
        return "现在30℃,有雾"
    return "现在温度35℃"


# 将工具放入列表
tools = [search]

# 创建工具节点
tool_node = ToolNode(tools)

# 初始化模型和工具,定义并绑定工具到模型
model = ChatOpenAI(
    model=os.environ.get("MODEL_ID", "gpt-4o"),
    base_url=os.environ.get("OPENAI_BASE_URL"),
    temperature=0
).bind_tools(tools)


# 定义函数,决定是否要继续执行
def should_continue(state: State) -> Literal["tools", END]:
    messages = state['messages']
    last_message = messages[-1]

    # 如果 LLM 调用了工具,则转到 Tools 节点
    if last_message.tool_calls:
        return "tools"
    # 否则停止
    return END


# 定义调用模型的函数
def call_model(state: State):
    messages = state['messages']
    unit = state.get('temperature_unit', '摄氏度')

    # 复制列表避免原地修改状态的引用
    modified_messages = list(messages)
    if unit == '华氏度':
        modified_messages.insert(0, SystemMessage(
            content="请在回答中将所有温度数值从摄氏度转换为华氏度,重新计算并标注°F。"
                    "例如:30°C = 86°F,35°C = 95°F。不要直接引用工具返回的摄氏度数值。"
        ))

    response = model.invoke(modified_messages)
    # 返回增量更新,LangGraph 会自动归并到现有状态中
    return {
   "messages": [response]}


# 用自定义状态初始化图
workflow = StateGraph(State)

# 定义图节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

# 定义入口点和图边
workflow.add_edge(START, "agent")

# 添加条件边
workflow.add_conditional_edges(
    "agent",
    should_continue
)

# 添加从 tools 到 agent 的普通边
workflow.add_edge("tools", "agent")

# 初始化内存以在图运行之间持久化状态
checkpointer = InMemorySaver()

# 编译图
app = workflow.compile(checkpointer=checkpointer)


# === 第一次调用:设定偏好为摄氏度 ===
print("=== 第一次调用:设定偏好为摄氏度 ===")
final_state = app.invoke(
    {
   
        "messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
        "temperature_unit": "摄氏度"
    },
    config={
   "configurable": {
   "thread_id": 1}}
)
print(final_state["messages"][-1].content)
print(f"[检查状态] temperature_unit = {final_state.get('temperature_unit')}\n")

# === 第二次调用:只传消息,不传 temperature_unit ===
print("=== 第二次调用:只传消息,不传 temperature_unit ===")
print("预期:temperature_unit 保持 '摄氏度',检查点恢复的旧值被保留")
final_state = app.invoke(
    {
   "messages": [HumanMessage(content="北京呢?")]},
    config={
   "configurable": {
   "thread_id": 1}}
)
print(final_state['messages'][-1].content)
print(f"[检查状态] temperature_unit = {final_state.get('temperature_unit')}\n")

# === 第三次调用:把偏好改为华氏度 ===
print("=== 第三次调用:把偏好改为华氏度 ===")
print("预期:temperature_unit 被覆盖为 '华氏度',回答中应出现 °F")
final_state = app.invoke(
    {
   
        "messages": [HumanMessage(content="上海天气如何?")],
        "temperature_unit": "华氏度"  # 新值覆盖旧值
    },
    config={
   "configurable": {
   "thread_id": 1}}
)
print(final_state['messages'][-1].content)
print(f"[检查状态] temperature_unit = {final_state.get('temperature_unit')}")


# 将生成的图片保存到文件夹
graph_png = app.get_graph().draw_mermaid_png()

with open("langgraph_02.png", "wb") as f:
    f.write(graph_png)

六、其他

其他内置归约器

除了 add_messages,LangGraph 和标准库还提供了一些常用归约器:

归约器 来源 作用
add_messages langgraph.graph.message 消息列表追加 + 去重
add operator 列表或字符串拼接
sum operator 数值累加
自定义函数 你自己写 任意合并逻辑,如字典深度合并、取最大值等

自定义归约器是一个普通的二元函数:

def merge_dicts(old: dict, new: dict) -> dict:
    """字典深度合并:新字典的键覆盖旧字典,但嵌套字典递归合并"""
    result = old.copy()
    for key, value in new.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = merge_dicts(result[key], value)
        else:
            result[key] = value
    return result

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    metadata: Annotated[dict, merge_dicts]  # 使用自定义归约器

多个自定义字段的注意事项

当你的 State 里有很多字段时,建议遵循一个原则:只有列表类字段需要显式声明追加归约器;标量字段(字符串、数字、布尔值)默认覆盖行为通常是正确的。

如果你给一个字符串字段错误地挂上了 add

nickname: Annotated[str, add]

那么第一次传入 "Alice",第二次传入 "Bob",结果会变成 "AliceBob"——这通常不是你想要的。

可视化对比

运行脚本后会生成 langgraph_02.png。打开它你会发现,图的拓扑结构和第一章几乎一模一样:仍然是 agent -> tools -> agent 的循环。区别只在于"黑板上写的内容"变多了——从只有 messages,变成了 messages + temperature_unit

这也体现了 LangGraph 的设计哲学:图的骨架(节点和边)负责控制流,图的血液(State)负责数据流。扩展血液不需要改动骨架。

目录
相关文章
|
2天前
|
人工智能 API 开发工具
Claude Code国内安装:2026最新保姆教程(附cc-switch配置)
Claude Code是我目前最推荐的AI编程工具,没有之一。 它可能不是最简单的,但绝对是上限最高的。一旦跑通安装、接上模型、定好规范,你会发现很多原本需要几小时的工作,现在几分钟就能搞定。 这套方案的核心优势就三个字:可控性。你不用依赖任何不稳定服务,所有组件都在自己手里。模型效果不好?换一个。框架更新了?自己决定升不升。 这才是AI时代开发者该有的姿势——不是被动等喂饭,而是主动搭建自己的生产力基础设施。 希望这篇保姆教程,能帮你顺利上车。做出你自己的作品。
Claude Code国内安装:2026最新保姆教程(附cc-switch配置)
|
9天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
3841 21
|
5天前
|
人工智能 JSON BI
DeepSeek V4 来了!超越 Claude Sonnet 4.5,赶紧对接 Claude Code 体验一把
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro 的真实体验与避坑记录 本文记录我将 Claude Code 对接 DeepSeek 最新模型(V4Pro)后的真实体验,测试了 Skills 自动化查询和积木报表 AI 建表两个场景——有惊喜,也踩
2432 8
|
4天前
|
人工智能 缓存 BI
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro,跑完 Skills —— OA 审批、大屏、报表、部署 5 大实战场景后的真实体验 ![](https://oscimg.oschina.net/oscnet/up608d34aeb6bafc47f
2061 4
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
|
21天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
18962 60
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
2天前
|
SQL 人工智能 弹性计算
阿里云发布 Agentic NDR,威胁检测与响应进入智能体时代
欢迎前往阿里云云防火墙控制台体验!
1169 2

热门文章

最新文章