【LangGraph新手村系列】(5)时间旅行:浏览历史、分叉时间线与修改过去

简介: 解决"状态只能单向增长"的问题。用get_state_history()像Git log一样浏览全部历史检查点,用旧快照的values作为新thread输入实现时间线分叉,用update_state()在任意检查点位置注入新状态后继续执行。三种操作赋予Agent全生命周期的事后控制能力。

第五章 时间旅行:浏览历史、分叉时间线与修改过去

"如果你能回到对话的任意时刻,从那里重新开始——AI 的'撤销键'比 Ctrl+Z 强大得多。"

一、问题

前四章我们建立了一个完整的单 Agent 系统:它能在 ReAct 循环中调用工具,记住用户偏好,把状态持久化到 PostgreSQL,甚至在关键节点停下来等人类确认。但真实业务中,一个经常被忽视的场景是:

  • "我刚才不该允许那个工具调用的":用户在 HIL 中断时点了同意,事后发现工具返回了错误数据。能不能回到中断前的状态,重新做决定?
  • "能不能从第 3 轮对话开始,换一种方式回答":用户想测试不同的 System Prompt,但不想重跑前面 2 轮。
  • "帮我查一下,Agent 在调用那个工具之前,状态是什么样的":debug 时需要回溯完整的中间快照,而不只是最终输出。
  • "给某个历史检查点塞一条指令,让它带着新指令继续跑":通过事后注入状态来改变后续执行路径。

核心矛盾:前四章我们一直在"向前看"——状态只能沿时间线单向增长。但生产级 Agent 需要时间旅行(Time Travel)能力:回到任意历史快照,查看它、从它分叉、甚至修改它。

LangGraph 把这三个能力叫做:浏览历史(get_state_history)分叉(Fork)修改状态(update_state)

二、解决方案

三件事:

  1. 浏览历史:用 get_state_history(config) 拉出一个会话的全部检查点快照,像翻 Git log 一样逐条查看。
  2. 分叉执行:选择一个历史快照,用它的值作为新 thread_id 的起始状态,跑出一条独立的新时间线。
  3. 修改过去:用 update_state(config, values) 在任何检查点位置插入新状态,然后从被修改的位置继续运行。
[原始时间线]
thread_id="chapter-5"
     check0 → check1 → check2 → check3 → check4
                         ↑                ↑
                    get_state_history()   get_state()

[分叉时间线]
thread_id="chapter-5-fork-1"
     check3'的值 ──→ 新 check0 → 新 check1 → ...

[修改时间线]
thread_id="chapter-5"
     check0 → check1 → check2 → check3 → [注入 SystemMessage] → check_new

三、工作原理

1. 浏览历史:get_state_history()

history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")

for i, snapshot in enumerate(history):
    print(f"[{i}] checkpoint_id: {snapshot.config['configurable']['checkpoint_id']}")
    msgs = snapshot.values.get("messages", [])
    if msgs:
        last = msgs[-1]
        print(f"    最后消息: {last.type} -> {str(last.content)[:40]}...")
    print(f"    metadata: step={snapshot.metadata.get('step')}, source={snapshot.metadata.get('source')}")

get_state_history() 返回一个生成器,按时间倒序(最新在前)列出指定 thread_id 下的所有检查点。每个元素是一个 StateSnapshot,包含三个属性:

属性 含义
snapshot.config 该检查点的完整 config(含 thread_idcheckpoint_id
snapshot.values 该检查点时的完整状态值(messages、temperature_unit 等)
snapshot.metadata 元数据:step(累计步数)、source("loop" 或 "input")

注意 list() 把生成器转成列表——因为生成器是惰性的,不转列表无法获取总数,也无法随机访问。

2. 什么是"快照点"?

LangGraph 在每次完成一个 super-step 时自动创建检查点。在 ReAct 循环中,一次 agent → tools → agent 的往返通常产生两个快照:

step 1: HumanMessage 输入后         → checkpoint_0
step 2: agent 调用模型后            → checkpoint_1
step 3: tools 执行后               → checkpoint_2
step 4: agent 再次调用模型后        → checkpoint_3
step 5: should_continue 返回 END   → 无新快照(运行结束)

所以一次完整的"提问→回答"可能产生 4-5 个检查点。get_state_history() 会把它们全部捞出来,从最新到最旧排列。

3. 分叉:从历史快照创建新时间线

target = history[3]  # 选择第 3 个历史检查点
fork_thread_id = f"{thread_id}-fork-1"
fork_config = {
   "configurable": {
   "thread_id": fork_thread_id}}

# 用旧状态的值,在新 thread 上启动
for chunk in app.stream(target.values, fork_config, stream_mode="values"):
    pass

final = app.get_state(fork_config)
print(final.values["messages"][-1].content)

这里的关键是:不传新的 messages 输入,而是直接把历史快照的 target.values 作为初始状态

当你传 target.valuesstream() 时,LangGraph 会把这个完整状态(含历史消息列表)当作起点,在新 thread_id 下继续运行。模型会看到历史里所有消息,然后决定下一步做什么。

分叉和原始时间线完全隔离:

  • 原始 thread_id 的数据不受任何影响。
  • fork_thread_id 有自己独立的检查点序列。

这在实验和 debug 中极其有用——你可以同时测试 5 种不同的后续分支,互不干扰。

4. 修改历史状态:update_state()

target = history[3]

# update_state 返回新 config,包含了新的 checkpoint_id
new_config = app.update_state(
    target.config,
    {
   
        "messages": [
            SystemMessage(content="【时间旅行插入】请用诗歌格式回答")
        ]
    }
)

# 从新检查点继续执行
for chunk in app.stream(None, new_config, stream_mode="values"):
    pass

update_state() 做三件事:

  1. 在指定检查点之后创建一个新的检查点(相当于在原时间线上插入了一个"修改记录")。
  2. values 参数里的内容按状态归约规则合并进去。
  3. 返回一个新的 config,其中的 checkpoint_id 指向新创建的那个检查点。

注意:

  • 不传 as_node 参数:默认行为等同于 as_node="__start__",即把修改后的状态当作新起始状态。
  • 传入 target.config(含 checkpoint_id):告诉 LangGraph 在哪个检查点的位置插入修改。
  • 传入 None 作为 stream 的输入:因为 new_config 已经指定了 checkpoint_id,图会从该检查点继续运行,不需要新的用户输入。

5. 三者关系的完整流程

thread_id = "chapter-5"
config = {
   "configurable": {
   "thread_id": thread_id}}

# 1. 发起第一次对话
inputs = {
   
    "messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
    "temperature_unit": "摄氏度"
}
for chunk in app.stream(inputs, config, stream_mode="values"):
    pass

# 2. 浏览历史
history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")

# 3. 分叉:用第 4 个历史快照在新 thread 上重跑
target = history[3]
fork_config = {
   "configurable": {
   "thread_id": f"{thread_id}-fork-1"}}
for chunk in app.stream(target.values, fork_config, stream_mode="values"):
    pass

# 4. 修改历史:在原 thread 的第 4 个检查点插入一条 SystemMessage
new_config = app.update_state(
    target.config,
    {
   "messages": [SystemMessage(content="【时间旅行插入】请用诗歌格式回答")]}
)

# 5. 从修改处继续
for chunk in app.stream(None, new_config, stream_mode="values"):
    pass

6. 与第四章的关键区别

第四章的 interrupt_before暂停当前执行——图还没有完成,你在路的中间停下。本章的时间旅行是事后回溯——图已经跑完了,你回到历史上的某个点,从那里重新出发。

维度 第四章 HIL 第五章 Time Travel
时机 执行过程中暂停 执行完成后回溯
机制 interrupt_before + Command(resume=True) get_state_history + update_state / 分叉
用途 实时审查工具调用 Debug、A/B 测试、事后修正
影响 暂停当前执行流 创建新检查点或新时间线
是否需要用户交互 是(input() 等待) 否(程序自动操作)

四、核心组件一览

组件 类 / 函数 / 参数 作用
历史浏览 app.get_state_history(config) 返回指定 thread 的所有检查点快照,按时间倒序
快照对象 StateSnapshot 包含 configvaluesmetadata 三个属性
快照配置 snapshot.config thread_idcheckpoint_id,唯一标识一个快照
快照值 snapshot.values 该快照时的完整状态字典,可直接作为新 thread 的输入
快照元数据 snapshot.metadata step(累计步数)、source("loop" / "input")
分叉执行 app.stream(old_state.values, new_config) 用历史状态值在新 thread 上启动独立执行
修改状态 app.update_state(config, values) 在指定检查点后插入新检查点,返回新 config
从检查点恢复 app.stream(None, new_config) new_config 含 checkpoint_id,图从该点继续运行

五、试一试

前置条件

.env 配置(同第四章):

OPENAI_API_KEY=sk-...
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo

执行脚本

cd demo-01
python langgraph-05.py

预期输出:

共 5 个检查点

[0] checkpoint_id: 1f142370-...
    最后消息: ai -> 根据查询结果,上海目前的天气...
    metadata: step=5, source=loop

[1] checkpoint_id: ...
    最后消息: tool -> 现在30℃,有雾...
    metadata: step=4, source=loop

...

=== 分叉后的最终回答 ===
根据查询结果,上海目前的天气...

=== 修改历史后的回答 ===
(模型用诗歌格式回答)

六、完整代码

与第四章相比,图定义、工具、模型、检查点全部复用,只有执行逻辑从"HIL 中断恢复"变成了"时间旅行三连操作"。

from dotenv import load_dotenv
load_dotenv(override=True)

import os
import psycopg
from typing import TypedDict
from urllib.parse import urlparse
from typing import Literal, Annotated
from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_core.messages import BaseMessage, SystemMessage
from langgraph.graph import MessagesState, START, END, StateGraph

# 本章不再需要 Command(第四章用于中断恢复)
# from langgraph.types import Command

# ========== 1. 自定义状态(与第二章相同) ==========

class State(TypedDict):
    """
    自定义状态:消息历史 + 温度单位偏好
    """
    messages: Annotated[list[BaseMessage], add_messages]
    # add_messages 归约器:新消息追加到旧列表,自动去重
    temperature_unit: str
    # 默认归约器(覆盖):用户偏好,如 "摄氏度" 或 "华氏度"


# ========== 2. 工具与模型(与第一章相同) ==========

@tool
def search(query: str) -> str:
    """模拟一个搜索工具:查询城市天气"""
    if "上海" in query.lower() or "Shanghai" in query.lower():
        return "现在30℃,有雾"
    return "现在温度35℃"


tools = [search]
tool_node = ToolNode(tools)

model = ChatOpenAI(
    model=os.environ.get("MODEL_ID", "gpt-4o"),
    base_url=os.environ.get("OPENAI_BASE_URL"),
    temperature=0
).bind_tools(tools)


# ========== 3. 节点与条件边(与第二章相同) ==========

def should_continue(state: State) -> Literal["tools", END]:
    """子图内部的条件路由:模型是否想调用工具?"""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return END


def call_model(state: State):
    """agent 节点:读取温度偏好,构造消息列表,调用模型"""
    messages = state["messages"]
    unit = state.get("temperature_unit", "摄氏度")

    modified_messages = list(messages)  # 复制,避免原地修改 state 引用
    if unit == "华氏度":
        modified_messages.insert(0, SystemMessage(
            content="请在回答中将所有温度数值从摄氏度转换为华氏度,"
                    "重新计算并标注°F。"
                    "例如:30°C = 86°F,35°C = 95°F。"
                    "不要直接引用工具返回的摄氏度数值。"
        ))

    response = model.invoke(modified_messages)
    return {
   "messages": [response]}


# ========== 4. 构建图(与第二章相同) ==========

workflow = StateGraph(State)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")


# ========== 5. PostgreSQL 持久化检查点(与第三章相同) ==========

DB_URI = os.environ.get("POSTGRES_URI")

# 自动创建数据库(如果不存在)
parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"
with psycopg.connect(base_uri, autocommit=True) as init_conn:
    try:
        init_conn.execute(f"CREATE DATABASE {db_name};")
    except psycopg.errors.DuplicateDatabase:
        pass  # 数据库已存在,忽略

# 创建连接并初始化检查点
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup()


# ========== 6. 编译图 ==========

# 注意:与第四章不同,这里不传 interrupt_before
# 本章的"时间旅行"是事后回溯,不是执行过程中暂停
app = workflow.compile(checkpointer=checkpointer)


# ========== 7. 第一次对话:查询上海天气 ==========

thread_id = "chapter-5"
config = {
   "configurable": {
   "thread_id": thread_id}}

inputs = {
   
    "messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
    "temperature_unit": "摄氏度"
}

# 用 stream 跑完(本章不需要 interrupt,但 stream 仍然是更灵活的方式)
for chunk in app.stream(inputs, config, stream_mode="values"):
    pass  # 遍历驱动生成器,图自动跑完


# ========== 8. 时间旅行操作一:浏览历史 ==========

history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")

for i, snapshot in enumerate(history):
    print(f"\n[{i}] checkpoint_id: {snapshot.config['configurable']['checkpoint_id']}")
    msgs = snapshot.values.get("messages", [])
    if msgs:
        last = msgs[-1]
        content = last.content[:40] if hasattr(last, "content") else str(last)[:40]
        print(f"    最后消息: {last.type if hasattr(last, 'type') else 'msg'} -> {content}...")
    print(f"    metadata: step={snapshot.metadata.get('step')}, source={snapshot.metadata.get('source')}")


# ========== 9. 时间旅行操作二:分叉 ==========

if len(history) >= 4:
    target = history[3]  # 选择第 4 个历史检查点(HumanMessage 输入后的快照)
    fork_thread_id = f"{thread_id}-fork-1"
    fork_config = {
   "configurable": {
   "thread_id": fork_thread_id}}

    # 用旧状态的值,在新 thread 上启动——不传新输入,直接复用历史状态
    for chunk in app.stream(target.values, fork_config, stream_mode="values"):
        pass

    # 查新 thread 的最新状态(不带 checkpoint_id,取最新)
    final = app.get_state(fork_config)
    print("\n=== 分叉后的最终回答 ===")
    if "messages" in final.values and final.values["messages"]:
        print(final.values["messages"][-1].content)
    else:
        print("分叉后没有消息")


# ========== 10. 时间旅行操作三:修改历史状态 ==========

if len(history) >= 4:
    target = history[3]

    # update_state 在指定检查点后创建新检查点,返回新 config
    new_config = app.update_state(
        target.config,  # 含 checkpoint_id,告诉 LangGraph 在哪里插入
        {
   
            "messages": [
                SystemMessage(content="【时间旅行插入】请用诗歌格式回答")
                # 不传 as_node → 默认行为等同于 as_node="__start__"
                # 即把修改后的状态当作新的起始状态
            ]
        }
    )

    # 从新检查点继续执行
    # stream 的输入为 None:new_config 已含 checkpoint_id,图从该点继续
    for chunk in app.stream(None, new_config, stream_mode="values"):
        pass

    # 查同一 thread 的最新状态(不带 checkpoint_id,自动取最新)
    final_config = {
   "configurable": {
   "thread_id": thread_id}}
    final2 = app.get_state(final_config)
    print("\n=== 修改历史后的回答 ===")
    print(final2.values["messages"][-1].content)

七、其他

三种时间旅行模式的对比

模式 代码操作 对数据库的影响 适用场景
只读浏览 get_state_history() 无修改 Debug、审计、数据分析
分叉新线 stream(old.values, new_config) 新 thread 创建新检查点序列 A/B 测试、实验不同参数
修改过去 update_state() + stream(None, new_config) 原 thread 追加新检查点 事后修正、注入指令重新执行

分叉的实际意义

分叉不仅是 debug 工具,也是实验工具。假设你的 Agent 有 3 种不同的 System Prompt:

prompts = ["请用幽默风格回答", "请用专业术语回答", "请用诗歌格式回答"]

for i, prompt in enumerate(prompts):
    fork_config = {
   "configurable": {
   "thread_id": f"test-{i}"}}
    fork_values = {
   **checkpoint.values, "messages": [SystemMessage(content=prompt)]}
    for chunk in app.stream(fork_values, fork_config, stream_mode="values"):
        pass

三个分支并行测试,互不干扰,最后对比哪个效果最好。

update_state()as_node 参数

默认情况下,update_state 的行为等同于 as_node="__start__"——修改后的状态成为新的起点,图从 START 边重新出发。如果你指定 as_node="agent",则相当于模拟 agent 节点的输出,图会从 agent 出发走条件边。

# 模拟 agent 输出了 tool_calls,让图直接跳到 tools 节点
new_config = app.update_state(
    target.config,
    {
   "messages": [AIMessage(content="", tool_calls=[...])]},
    as_node="agent"
)

这在需要"跳过某些步骤"的 debug 场景中很有用。

与第四章的衔接

第四章教会 Agent 在关键节点"停下来",第五章教会你事后"回到过去"。两者结合才是完整的可控 Agent:

  • 事前控制interrupt_before / interrupt_after → 运行过程中暂停,实时审查。
  • 事后控制get_state_history / fork / update_state → 运行完成后回溯,事后修正。

掌握了这两套机制,你对 Agent 的执行就有了全生命周期的控制权——无论是进行时还是完成时,你都能介入。

目录
相关文章
|
8天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23426 8
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
17天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
6409 25
|
12天前
|
人工智能 缓存 BI
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro,跑完 Skills —— OA 审批、大屏、报表、部署 5 大实战场景后的真实体验 ![](https://oscimg.oschina.net/oscnet/up608d34aeb6bafc47f
4140 13
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
|
13天前
|
人工智能 JSON BI
DeepSeek V4 来了!超越 Claude Sonnet 4.5,赶紧对接 Claude Code 体验一把
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro 的真实体验与避坑记录 本文记录我将 Claude Code 对接 DeepSeek 最新模型(V4Pro)后的真实体验,测试了 Skills 自动化查询和积木报表 AI 建表两个场景——有惊喜,也踩
4957 13
|
29天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
23191 65
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)