第三章 PostgreSQL 持久化检查点:让状态跨越进程与重启
"内存里的检查点就像写在沙滩上的字——浪一打就消失。PostgreSQL 才是真正的保险箱。"
一、问题
前两章我们一直用 InMemorySaver 做检查点。它很方便:几行代码就能让 Agent 拥有跨轮次记忆。但当你想把它搬到生产环境时,几个尖锐的问题立刻浮现:
- 进程重启即失忆:
InMemorySaver把状态存在 Python 字典里,服务一重启或部署新版本,所有会话记忆全部清零。 - 无法横向扩展:如果服务有多个实例(负载均衡),
InMemorySaver各自为政,用户的对话历史被切成了碎片。 - 无法持久化审计:生产系统通常需要留存会话记录,用于故障排查、合规审计或数据分析。内存里的字典既查不了,也导不出。
- 容量受限:用户越多、对话越长,
InMemorySaver占用的内存线性增长,最终撑爆进程。
核心矛盾:前两章解决了"图内状态怎么流转"的问题,但没有解决"图外状态怎么保存"的问题。我们需要把检查点从内存搬到数据库。
LangGraph 为此提供了多种持久化检查点实现。本章选择 PostgreSQL——原因很直接:它是生产环境最常见的选择,成熟稳定,JSONB 原生支持让状态序列化非常自然。
二、解决方案
我们要做四件事:
- 引入 PostgreSQL 连接:用
psycopg(PostgreSQL 的 Python 驱动)建立数据库连接。 - 自动准备数据库:如果目标数据库不存在,先连到默认的
postgres库自动创建它。 - 替换检查点实现:把
InMemorySaver换成PostgresSaver,在连接上调用setup()自动建表。 - 保持业务逻辑不变:图的定义、节点的逻辑、状态的归约——完全复用第二章的代码,只有检查点的存储介质变了。
+--------+ +------------+ +------------------+
| START | ---> | agent | ---> | should_continue |
| | | (模型调用) | | (条件判断) |
+--------+ +------+-----+ +---------+--------+
^ |
| |
| +-------------+-------------+
| | 返回 "tools" | 返回 END |
| | | |
| v v v
| +--------+ +--------+
| | tools | | END |
| |(工具) | +--------+
| +---+----+
| |
+-------+
(回到 agent)
[外部仓库]
PostgreSQL 数据库
- checkpoints
- checkpoint_writes
- checkpoint_blobs
- checkpoint_migrations
图的结构没有任何变化,只是右下角的"外部仓库"从内存字典换成了 PostgreSQL 数据库。
三、工作原理
1. 从 InMemorySaver 到 PostgresSaver
InMemorySaver 和 PostgresSaver 都实现了同一个接口:BaseCheckpointSaver。这意味着它们对图的业务代码完全透明——只要你在 workflow.compile(checkpointer=...) 里传入的实例实现了这个接口,图就能正常读写检查点。
# 第二章:内存检查点
checkpointer = InMemorySaver()
app = workflow.compile(checkpointer=checkpointer)
# 第三章:PostgreSQL 检查点(只换一行)
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup()
app = workflow.compile(checkpointer=checkpointer)
app.invoke(...) 的调用方式、config={"configurable": {"thread_id": "chapter-1"}} 的传参方式,完全不需要改。
2. 依赖安装与环境配置
PostgresSaver 来自 langgraph-checkpoint-postgres 包,psycopg 是 PostgreSQL 的 Python 3 驱动:
uv add psycopg langgraph-checkpoint-postgres
确认 .env 中有 PostgreSQL 连接字符串:
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo
连接字符串的结构:
postgresql:// postgres : 密码 @ localhost : 5432 / langgraph_demo
| | | | | | |
协议 用户名 密码 主机 端口 数据库名
3. 自动创建数据库
生产脚本不能假设数据库已经存在。我们在代码中加入一段初始化逻辑:
import psycopg
from urllib.parse import urlparse
DB_URI = os.environ.get("POSTGRES_URI")
parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"
with psycopg.connect(base_uri, autocommit=True) as init_conn:
try:
init_conn.execute(f"CREATE DATABASE {db_name};")
except psycopg.errors.DuplicateDatabase:
pass # 数据库已存在,忽略
这里有两个细节:
autocommit=True:CREATE DATABASE不能在事务块中运行。开启autocommit让这条命令独立执行。try/except DuplicateDatabase:CREATE DATABASE没有IF NOT EXISTS语法,用异常捕获处理"已存在"的情况。
4. setup():自动建表
PostgresSaver 把检查点存储所需的表结构内建在代码里。调用 setup() 时,它会检查 checkpoint_migrations 表中的版本号,如果数据库还是空的,就执行一系列 CREATE TABLE 和 CREATE INDEX 语句。
一共会创建 4 张表:
| 表名 | 作用 |
|---|---|
checkpoints |
快照主表:每个会话在关键节点完成后的"完整状态照片" |
checkpoint_writes |
增量表:记录"哪个节点修改了哪个字段" |
checkpoint_blobs |
大对象表:存储消息列表等体积大的数据 |
checkpoint_migrations |
迁移记录:框架内部追踪表结构版本 |
checkpoints——快照主表
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint JSONB,
metadata JSONB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
thread_id:会话标识,同一个thread_id的所有检查点构成一条时间线。checkpoint_id:快照的唯一 ID(UUID),每次图运行结束生成一个新快照。parent_checkpoint_id:指向上一个快照的 ID。这个字段让检查点形成链表——你可以像 Git 回退一样,回到任意历史状态。checkpoint:JSONB 格式的压缩状态摘要。
checkpoint_writes——增量记录表
记录"本轮运行中,每个节点对状态做了什么修改":
CREATE TABLE checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL, -- 产生写入的节点
idx INT NOT NULL,
channel TEXT, -- 修改了哪个状态字段
type TEXT,
blob BYTEA, -- 实际写入的数据
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
LangGraph 不是每次存完整状态,而是记录增量。读取时取父快照 + 所有 writes = 当前完整状态,比全量序列化高效得多。
checkpoint_blobs——大对象分离表
消息列表可能很长,直接塞进 checkpoint_writes 会导致单行数据过大。这张表把大对象拆出来单独存储。
checkpoint_migrations——迁移版本表
框架内部使用,记录当前数据库 schema 的版本号,setup() 据此判断是否需要执行新的建表/改表脚本。
5. 为什么需要 autocommit=True
setup() 内部执行了 CREATE INDEX CONCURRENTLY 语句,这个命令不能在事务块中运行。如果 psycopg 连接默认开启事务模式,就会报错:
psycopg.errors.ActiveSqlTransaction: CREATE INDEX CONCURRENTLY 无法在事物块中运行
所以建立主连接时也需要 autocommit=True。
6. get_tuple():读取检查点
如果想验证数据是否真的写入了数据库,可以调用 checkpointer.get_tuple(config):
checkpoint_tuple = checkpointer.get_tuple(
{
"configurable": {
"thread_id": "chapter-1"}}
)
if checkpoint_tuple:
config, checkpoint, metadata = checkpoint_tuple[0], checkpoint_tuple[1], checkpoint_tuple[2]
print(f"最新检查点 ID: {checkpoint['id']}")
print(f"步骤数: {metadata.get('step')}, 来源: {metadata.get('source')}")
msgs = checkpoint.get('channel_values', {
}).get('messages', [])
print(f"已存储消息数: {len(msgs)}")
注意返回值是一个元组,不是列表:
| 索引 | 内容 | 含义 |
|---|---|---|
cp[0] |
config 字典 |
包含 thread_id、checkpoint_id 等配置信息 |
cp[1] |
checkpoint 字典 |
核心状态数据,含 channel_values |
cp[2] |
metadata 字典 |
运行时元数据,如 step、source |
cp[3] |
parent_config |
父检查点的配置(可选) |
cp[4] |
pending_writes |
待写入队列 |
7. 运行流程:持久化是如何发生的
当你调用 app.invoke(...) 时,LangGraph + PostgresSaver 的协作流程如下:
- 查档:用
thread_id查询checkpoints表,找到最新快照。如果没有,从零开始。 - 恢复:读取快照的
checkpointJSONB,反序列化出messages、temperature_unit等字段,注入到图的初始状态中。 - 运行:图开始执行,各节点返回增量写入内存状态。
- 存盘:运行结束后,LangGraph 把最终内存状态序列化,插入一条新的
checkpoints记录。各节点的增量写入被记录到checkpoint_writes。 - 下一位玩家:下一次
invoke()从这张新快照继续。
整个过程中,你的业务代码完全感知不到数据库的存在。这是存储层与业务层的彻底解耦。
四、核心组件一览
| 组件 | 类 / 函数 / 常量 | 作用 |
|---|---|---|
| PostgreSQL 驱动 | psycopg |
Python 3 的 PostgreSQL 连接库 |
| 连接字符串解析 | urllib.parse.urlparse |
从 POSTGRES_URI 中提取数据库名 |
| 自动建库 | CREATE DATABASE + DuplicateDatabase |
连接系统库 postgres,静默创建目标库 |
| 持久化检查点 | PostgresSaver |
将检查点存入 PostgreSQL,替代 InMemorySaver |
| 建表初始化 | PostgresSaver.setup() |
自动创建四张检查点相关表 |
| 自动提交 | autocommit=True |
绕过事务限制,允许索引并发创建 |
| 检查点读取 | get_tuple(config) |
从数据库读取最新快照元组 |
| 状态归约 | add_messages / 默认覆盖 |
与第二章完全一致,只是存储介质换成了数据库 |
五、试一试
前置条件
- 安装并启动 PostgreSQL(Windows 可用 EDB 安装包)。
- 记住安装时设置的
postgres用户密码。 - 确认
.env已配置:
OPENAI_API_KEY=sk-...
OPENAI_BASE_URL=https://api.moonshot.cn/v1 # 如果使用第三方服务
MODEL_ID=kimi-k2.5
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo
执行脚本
cd demo-01
python langgraph-03.py
你应该能看到和第二章相同的业务输出,但多了检查点验证信息:
=== 验证PostgreSQL检查点是否持久化写入 ===
最新检查点 ID: 1f142370-...
步骤数: 58, 来源: loop
已存储消息数: 48
验证持久化的方法
脚本多次运行:关掉终端,重新运行。步骤数会累加,因为每次运行都在同一个
thread_id上追加。换成thread_id="chapter-2",步骤数回到初始值。直接查数据库:
SELECT thread_id, checkpoint_id, metadata->>'step' AS step
FROM checkpoints
WHERE thread_id = 'chapter-1'
ORDER BY metadata->>'step' DESC
LIMIT 5;
- 跨进程验证:开两个终端,分别运行同一个脚本,使用同一个
thread_id。两个进程都能正确读取和写入同一组检查点。
完整代码
完整代码位于 demo-01/langgraph-03.py,与第二章相比只替换了检查点初始化部分。关键差异如下:
# ========== 第3章新增:PostgreSQL 持久化检查点 ==========
import psycopg
from urllib.parse import urlparse
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = os.environ.get("POSTGRES_URI")
# 自动创建数据库(如果不存在)
parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"
with psycopg.connect(base_uri, autocommit=True) as init_conn:
try:
init_conn.execute(f"CREATE DATABASE {db_name};")
except psycopg.errors.DuplicateDatabase:
pass # 数据库已存在,忽略
# 替换 InMemorySaver 为 PostgresSaver
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup() # 自动创建 checkpoints/writes/blobs/migrations 表
app = workflow.compile(checkpointer=checkpointer)
# ========== 验证检查点 ==========
checkpoint_tuple = checkpointer.get_tuple(
{
"configurable": {
"thread_id": "chapter-1"}}
)
if checkpoint_tuple:
config, checkpoint, metadata = checkpoint_tuple[0], checkpoint_tuple[1], checkpoint_tuple[2]
print(f"最新检查点 ID: {checkpoint['id']}")
print(f"步骤数: {metadata.get('step')}, 来源: {metadata.get('source')}")
msgs = checkpoint.get('channel_values', {
}).get('messages', [])
print(f"已存储消息数: {len(msgs)}")
其余代码(State、search 工具、call_model、should_continue、图构建、三次调用实验)与第二章完全一致。
六、其他
为什么选择 PostgreSQL?
LangGraph 的检查点存储有多种实现:
| 实现 | 适用场景 | 特点 |
|---|---|---|
InMemorySaver |
本地开发、快速原型 | 零配置,重启丢失 |
SqliteSaver |
单机部署、轻量应用 | 文件级数据库,不需要独立服务 |
PostgresSaver |
生产环境、多实例部署 | 网络数据库,支持并发读写 |
RedisSaver |
高频写入、缓存优先 | 纯内存 + 持久化,速度极快 |
选择 PostgreSQL 的理由:
- 成熟稳定: decades 的生产验证,运维生态丰富。
- JSONB 原生支持:状态序列化/反序列化无需额外转换。
- 并发安全:多进程、多实例同时读写同一个数据库,不会有数据竞争。
- 可观测性强:用 SQL 就能直接查询、分析、导出会话记录。
生产注意事项
- 连接池:示例代码使用单个连接。生产环境应使用连接池(如
psycopg_pool),避免每次请求都新建连接。 - 定期清理:
checkpoints表会持续增长,需要设计清理策略(如只保留最近 N 天的快照)。 - 备份:PostgreSQL 的
pg_dump可以直接把检查点数据备份出来,灾难恢复非常方便。 - 索引优化:
setup()已经创建了必要的索引,但如果你的thread_id前缀模式有规律,可能需要额外的复合索引。