第五章 时间旅行:浏览历史、分叉时间线与修改过去
"如果你能回到对话的任意时刻,从那里重新开始——AI 的'撤销键'比 Ctrl+Z 强大得多。"
一、问题
前四章我们建立了一个完整的单 Agent 系统:它能在 ReAct 循环中调用工具,记住用户偏好,把状态持久化到 PostgreSQL,甚至在关键节点停下来等人类确认。但真实业务中,一个经常被忽视的场景是:
- "我刚才不该允许那个工具调用的":用户在 HIL 中断时点了同意,事后发现工具返回了错误数据。能不能回到中断前的状态,重新做决定?
- "能不能从第 3 轮对话开始,换一种方式回答":用户想测试不同的 System Prompt,但不想重跑前面 2 轮。
- "帮我查一下,Agent 在调用那个工具之前,状态是什么样的":debug 时需要回溯完整的中间快照,而不只是最终输出。
- "给某个历史检查点塞一条指令,让它带着新指令继续跑":通过事后注入状态来改变后续执行路径。
核心矛盾:前四章我们一直在"向前看"——状态只能沿时间线单向增长。但生产级 Agent 需要时间旅行(Time Travel)能力:回到任意历史快照,查看它、从它分叉、甚至修改它。
LangGraph 把这三个能力叫做:浏览历史(get_state_history)、分叉(Fork)、修改状态(update_state)。
二、解决方案
三件事:
- 浏览历史:用
get_state_history(config)拉出一个会话的全部检查点快照,像翻 Git log 一样逐条查看。 - 分叉执行:选择一个历史快照,用它的值作为新
thread_id的起始状态,跑出一条独立的新时间线。 - 修改过去:用
update_state(config, values)在任何检查点位置插入新状态,然后从被修改的位置继续运行。
[原始时间线]
thread_id="chapter-5"
check0 → check1 → check2 → check3 → check4
↑ ↑
get_state_history() get_state()
[分叉时间线]
thread_id="chapter-5-fork-1"
check3'的值 ──→ 新 check0 → 新 check1 → ...
[修改时间线]
thread_id="chapter-5"
check0 → check1 → check2 → check3 → [注入 SystemMessage] → check_new
三、工作原理
1. 浏览历史:get_state_history()
history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")
for i, snapshot in enumerate(history):
print(f"[{i}] checkpoint_id: {snapshot.config['configurable']['checkpoint_id']}")
msgs = snapshot.values.get("messages", [])
if msgs:
last = msgs[-1]
print(f" 最后消息: {last.type} -> {str(last.content)[:40]}...")
print(f" metadata: step={snapshot.metadata.get('step')}, source={snapshot.metadata.get('source')}")
get_state_history() 返回一个生成器,按时间倒序(最新在前)列出指定 thread_id 下的所有检查点。每个元素是一个 StateSnapshot,包含三个属性:
| 属性 | 含义 |
|---|---|
snapshot.config |
该检查点的完整 config(含 thread_id、checkpoint_id) |
snapshot.values |
该检查点时的完整状态值(messages、temperature_unit 等) |
snapshot.metadata |
元数据:step(累计步数)、source("loop" 或 "input") |
注意 list() 把生成器转成列表——因为生成器是惰性的,不转列表无法获取总数,也无法随机访问。
2. 什么是"快照点"?
LangGraph 在每次完成一个 super-step 时自动创建检查点。在 ReAct 循环中,一次 agent → tools → agent 的往返通常产生两个快照:
step 1: HumanMessage 输入后 → checkpoint_0
step 2: agent 调用模型后 → checkpoint_1
step 3: tools 执行后 → checkpoint_2
step 4: agent 再次调用模型后 → checkpoint_3
step 5: should_continue 返回 END → 无新快照(运行结束)
所以一次完整的"提问→回答"可能产生 4-5 个检查点。get_state_history() 会把它们全部捞出来,从最新到最旧排列。
3. 分叉:从历史快照创建新时间线
target = history[3] # 选择第 3 个历史检查点
fork_thread_id = f"{thread_id}-fork-1"
fork_config = {
"configurable": {
"thread_id": fork_thread_id}}
# 用旧状态的值,在新 thread 上启动
for chunk in app.stream(target.values, fork_config, stream_mode="values"):
pass
final = app.get_state(fork_config)
print(final.values["messages"][-1].content)
这里的关键是:不传新的 messages 输入,而是直接把历史快照的 target.values 作为初始状态。
当你传 target.values 给 stream() 时,LangGraph 会把这个完整状态(含历史消息列表)当作起点,在新 thread_id 下继续运行。模型会看到历史里所有消息,然后决定下一步做什么。
分叉和原始时间线完全隔离:
- 原始
thread_id的数据不受任何影响。 - 新
fork_thread_id有自己独立的检查点序列。
这在实验和 debug 中极其有用——你可以同时测试 5 种不同的后续分支,互不干扰。
4. 修改历史状态:update_state()
target = history[3]
# update_state 返回新 config,包含了新的 checkpoint_id
new_config = app.update_state(
target.config,
{
"messages": [
SystemMessage(content="【时间旅行插入】请用诗歌格式回答")
]
}
)
# 从新检查点继续执行
for chunk in app.stream(None, new_config, stream_mode="values"):
pass
update_state() 做三件事:
- 在指定检查点之后创建一个新的检查点(相当于在原时间线上插入了一个"修改记录")。
- 把
values参数里的内容按状态归约规则合并进去。 - 返回一个新的
config,其中的checkpoint_id指向新创建的那个检查点。
注意:
- 不传
as_node参数:默认行为等同于as_node="__start__",即把修改后的状态当作新起始状态。 - 传入
target.config(含 checkpoint_id):告诉 LangGraph 在哪个检查点的位置插入修改。 - 传入
None作为 stream 的输入:因为new_config已经指定了 checkpoint_id,图会从该检查点继续运行,不需要新的用户输入。
5. 三者关系的完整流程
thread_id = "chapter-5"
config = {
"configurable": {
"thread_id": thread_id}}
# 1. 发起第一次对话
inputs = {
"messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
"temperature_unit": "摄氏度"
}
for chunk in app.stream(inputs, config, stream_mode="values"):
pass
# 2. 浏览历史
history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")
# 3. 分叉:用第 4 个历史快照在新 thread 上重跑
target = history[3]
fork_config = {
"configurable": {
"thread_id": f"{thread_id}-fork-1"}}
for chunk in app.stream(target.values, fork_config, stream_mode="values"):
pass
# 4. 修改历史:在原 thread 的第 4 个检查点插入一条 SystemMessage
new_config = app.update_state(
target.config,
{
"messages": [SystemMessage(content="【时间旅行插入】请用诗歌格式回答")]}
)
# 5. 从修改处继续
for chunk in app.stream(None, new_config, stream_mode="values"):
pass
6. 与第四章的关键区别
第四章的 interrupt_before 是暂停当前执行——图还没有完成,你在路的中间停下。本章的时间旅行是事后回溯——图已经跑完了,你回到历史上的某个点,从那里重新出发。
| 维度 | 第四章 HIL | 第五章 Time Travel |
|---|---|---|
| 时机 | 执行过程中暂停 | 执行完成后回溯 |
| 机制 | interrupt_before + Command(resume=True) |
get_state_history + update_state / 分叉 |
| 用途 | 实时审查工具调用 | Debug、A/B 测试、事后修正 |
| 影响 | 暂停当前执行流 | 创建新检查点或新时间线 |
| 是否需要用户交互 | 是(input() 等待) |
否(程序自动操作) |
四、核心组件一览
| 组件 | 类 / 函数 / 参数 | 作用 |
|---|---|---|
| 历史浏览 | app.get_state_history(config) |
返回指定 thread 的所有检查点快照,按时间倒序 |
| 快照对象 | StateSnapshot |
包含 config、values、metadata 三个属性 |
| 快照配置 | snapshot.config |
含 thread_id 和 checkpoint_id,唯一标识一个快照 |
| 快照值 | snapshot.values |
该快照时的完整状态字典,可直接作为新 thread 的输入 |
| 快照元数据 | snapshot.metadata |
step(累计步数)、source("loop" / "input") |
| 分叉执行 | app.stream(old_state.values, new_config) |
用历史状态值在新 thread 上启动独立执行 |
| 修改状态 | app.update_state(config, values) |
在指定检查点后插入新检查点,返回新 config |
| 从检查点恢复 | app.stream(None, new_config) |
new_config 含 checkpoint_id,图从该点继续运行 |
五、试一试
前置条件
.env 配置(同第四章):
OPENAI_API_KEY=sk-...
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo
执行脚本
cd demo-01
python langgraph-05.py
预期输出:
共 5 个检查点
[0] checkpoint_id: 1f142370-...
最后消息: ai -> 根据查询结果,上海目前的天气...
metadata: step=5, source=loop
[1] checkpoint_id: ...
最后消息: tool -> 现在30℃,有雾...
metadata: step=4, source=loop
...
=== 分叉后的最终回答 ===
根据查询结果,上海目前的天气...
=== 修改历史后的回答 ===
(模型用诗歌格式回答)
六、完整代码
与第四章相比,图定义、工具、模型、检查点全部复用,只有执行逻辑从"HIL 中断恢复"变成了"时间旅行三连操作"。
from dotenv import load_dotenv
load_dotenv(override=True)
import os
import psycopg
from typing import TypedDict
from urllib.parse import urlparse
from typing import Literal, Annotated
from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_core.messages import BaseMessage, SystemMessage
from langgraph.graph import MessagesState, START, END, StateGraph
# 本章不再需要 Command(第四章用于中断恢复)
# from langgraph.types import Command
# ========== 1. 自定义状态(与第二章相同) ==========
class State(TypedDict):
"""
自定义状态:消息历史 + 温度单位偏好
"""
messages: Annotated[list[BaseMessage], add_messages]
# add_messages 归约器:新消息追加到旧列表,自动去重
temperature_unit: str
# 默认归约器(覆盖):用户偏好,如 "摄氏度" 或 "华氏度"
# ========== 2. 工具与模型(与第一章相同) ==========
@tool
def search(query: str) -> str:
"""模拟一个搜索工具:查询城市天气"""
if "上海" in query.lower() or "Shanghai" in query.lower():
return "现在30℃,有雾"
return "现在温度35℃"
tools = [search]
tool_node = ToolNode(tools)
model = ChatOpenAI(
model=os.environ.get("MODEL_ID", "gpt-4o"),
base_url=os.environ.get("OPENAI_BASE_URL"),
temperature=0
).bind_tools(tools)
# ========== 3. 节点与条件边(与第二章相同) ==========
def should_continue(state: State) -> Literal["tools", END]:
"""子图内部的条件路由:模型是否想调用工具?"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
def call_model(state: State):
"""agent 节点:读取温度偏好,构造消息列表,调用模型"""
messages = state["messages"]
unit = state.get("temperature_unit", "摄氏度")
modified_messages = list(messages) # 复制,避免原地修改 state 引用
if unit == "华氏度":
modified_messages.insert(0, SystemMessage(
content="请在回答中将所有温度数值从摄氏度转换为华氏度,"
"重新计算并标注°F。"
"例如:30°C = 86°F,35°C = 95°F。"
"不要直接引用工具返回的摄氏度数值。"
))
response = model.invoke(modified_messages)
return {
"messages": [response]}
# ========== 4. 构建图(与第二章相同) ==========
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
# ========== 5. PostgreSQL 持久化检查点(与第三章相同) ==========
DB_URI = os.environ.get("POSTGRES_URI")
# 自动创建数据库(如果不存在)
parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"
with psycopg.connect(base_uri, autocommit=True) as init_conn:
try:
init_conn.execute(f"CREATE DATABASE {db_name};")
except psycopg.errors.DuplicateDatabase:
pass # 数据库已存在,忽略
# 创建连接并初始化检查点
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup()
# ========== 6. 编译图 ==========
# 注意:与第四章不同,这里不传 interrupt_before
# 本章的"时间旅行"是事后回溯,不是执行过程中暂停
app = workflow.compile(checkpointer=checkpointer)
# ========== 7. 第一次对话:查询上海天气 ==========
thread_id = "chapter-5"
config = {
"configurable": {
"thread_id": thread_id}}
inputs = {
"messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
"temperature_unit": "摄氏度"
}
# 用 stream 跑完(本章不需要 interrupt,但 stream 仍然是更灵活的方式)
for chunk in app.stream(inputs, config, stream_mode="values"):
pass # 遍历驱动生成器,图自动跑完
# ========== 8. 时间旅行操作一:浏览历史 ==========
history = list(app.get_state_history(config))
print(f"共 {len(history)} 个检查点")
for i, snapshot in enumerate(history):
print(f"\n[{i}] checkpoint_id: {snapshot.config['configurable']['checkpoint_id']}")
msgs = snapshot.values.get("messages", [])
if msgs:
last = msgs[-1]
content = last.content[:40] if hasattr(last, "content") else str(last)[:40]
print(f" 最后消息: {last.type if hasattr(last, 'type') else 'msg'} -> {content}...")
print(f" metadata: step={snapshot.metadata.get('step')}, source={snapshot.metadata.get('source')}")
# ========== 9. 时间旅行操作二:分叉 ==========
if len(history) >= 4:
target = history[3] # 选择第 4 个历史检查点(HumanMessage 输入后的快照)
fork_thread_id = f"{thread_id}-fork-1"
fork_config = {
"configurable": {
"thread_id": fork_thread_id}}
# 用旧状态的值,在新 thread 上启动——不传新输入,直接复用历史状态
for chunk in app.stream(target.values, fork_config, stream_mode="values"):
pass
# 查新 thread 的最新状态(不带 checkpoint_id,取最新)
final = app.get_state(fork_config)
print("\n=== 分叉后的最终回答 ===")
if "messages" in final.values and final.values["messages"]:
print(final.values["messages"][-1].content)
else:
print("分叉后没有消息")
# ========== 10. 时间旅行操作三:修改历史状态 ==========
if len(history) >= 4:
target = history[3]
# update_state 在指定检查点后创建新检查点,返回新 config
new_config = app.update_state(
target.config, # 含 checkpoint_id,告诉 LangGraph 在哪里插入
{
"messages": [
SystemMessage(content="【时间旅行插入】请用诗歌格式回答")
# 不传 as_node → 默认行为等同于 as_node="__start__"
# 即把修改后的状态当作新的起始状态
]
}
)
# 从新检查点继续执行
# stream 的输入为 None:new_config 已含 checkpoint_id,图从该点继续
for chunk in app.stream(None, new_config, stream_mode="values"):
pass
# 查同一 thread 的最新状态(不带 checkpoint_id,自动取最新)
final_config = {
"configurable": {
"thread_id": thread_id}}
final2 = app.get_state(final_config)
print("\n=== 修改历史后的回答 ===")
print(final2.values["messages"][-1].content)
七、其他
三种时间旅行模式的对比
| 模式 | 代码操作 | 对数据库的影响 | 适用场景 |
|---|---|---|---|
| 只读浏览 | get_state_history() |
无修改 | Debug、审计、数据分析 |
| 分叉新线 | stream(old.values, new_config) |
新 thread 创建新检查点序列 | A/B 测试、实验不同参数 |
| 修改过去 | update_state() + stream(None, new_config) |
原 thread 追加新检查点 | 事后修正、注入指令重新执行 |
分叉的实际意义
分叉不仅是 debug 工具,也是实验工具。假设你的 Agent 有 3 种不同的 System Prompt:
prompts = ["请用幽默风格回答", "请用专业术语回答", "请用诗歌格式回答"]
for i, prompt in enumerate(prompts):
fork_config = {
"configurable": {
"thread_id": f"test-{i}"}}
fork_values = {
**checkpoint.values, "messages": [SystemMessage(content=prompt)]}
for chunk in app.stream(fork_values, fork_config, stream_mode="values"):
pass
三个分支并行测试,互不干扰,最后对比哪个效果最好。
update_state() 的 as_node 参数
默认情况下,update_state 的行为等同于 as_node="__start__"——修改后的状态成为新的起点,图从 START 边重新出发。如果你指定 as_node="agent",则相当于模拟 agent 节点的输出,图会从 agent 出发走条件边。
# 模拟 agent 输出了 tool_calls,让图直接跳到 tools 节点
new_config = app.update_state(
target.config,
{
"messages": [AIMessage(content="", tool_calls=[...])]},
as_node="agent"
)
这在需要"跳过某些步骤"的 debug 场景中很有用。
与第四章的衔接
第四章教会 Agent 在关键节点"停下来",第五章教会你事后"回到过去"。两者结合才是完整的可控 Agent:
- 事前控制:
interrupt_before/interrupt_after→ 运行过程中暂停,实时审查。 - 事后控制:
get_state_history/fork/update_state→ 运行完成后回溯,事后修正。
掌握了这两套机制,你对 Agent 的执行就有了全生命周期的控制权——无论是进行时还是完成时,你都能介入。