【LangGraph新手村系列】(3)PostgreSQL 持久化检查点:让状态跨越进程与重启

简介: 将内存检查点替换为 PostgreSQL 持久化存储。PostgresSaver 自动建库建表,四张核心表分别存储快照、增量写入、大对象和迁移版本。autocommit=True 绕过事务限制,让状态跨越进程重启,支持多实例共享。

第三章 PostgreSQL 持久化检查点:让状态跨越进程与重启

"内存里的检查点就像写在沙滩上的字——浪一打就消失。PostgreSQL 才是真正的保险箱。"

一、问题

前两章我们一直用 InMemorySaver 做检查点。它很方便:几行代码就能让 Agent 拥有跨轮次记忆。但当你想把它搬到生产环境时,几个尖锐的问题立刻浮现:

  • 进程重启即失忆InMemorySaver 把状态存在 Python 字典里,服务一重启或部署新版本,所有会话记忆全部清零。
  • 无法横向扩展:如果服务有多个实例(负载均衡),InMemorySaver 各自为政,用户的对话历史被切成了碎片。
  • 无法持久化审计:生产系统通常需要留存会话记录,用于故障排查、合规审计或数据分析。内存里的字典既查不了,也导不出。
  • 容量受限:用户越多、对话越长,InMemorySaver 占用的内存线性增长,最终撑爆进程。

核心矛盾:前两章解决了"图内状态怎么流转"的问题,但没有解决"图外状态怎么保存"的问题。我们需要把检查点从内存搬到数据库

LangGraph 为此提供了多种持久化检查点实现。本章选择 PostgreSQL——原因很直接:它是生产环境最常见的选择,成熟稳定,JSONB 原生支持让状态序列化非常自然。

二、解决方案

我们要做四件事:

  1. 引入 PostgreSQL 连接:用 psycopg(PostgreSQL 的 Python 驱动)建立数据库连接。
  2. 自动准备数据库:如果目标数据库不存在,先连到默认的 postgres 库自动创建它。
  3. 替换检查点实现:把 InMemorySaver 换成 PostgresSaver,在连接上调用 setup() 自动建表。
  4. 保持业务逻辑不变:图的定义、节点的逻辑、状态的归约——完全复用第二章的代码,只有检查点的存储介质变了。
+--------+      +------------+      +------------------+
| START  | ---> |   agent    | ---> | should_continue  |
|        |      | (模型调用)  |      |   (条件判断)      |
+--------+      +------+-----+      +---------+--------+
                       ^                      |
                       |                      |
                       |        +-------------+-------------+
                       |        | 返回 "tools" |  返回 END    |
                       |        |              |              |
                       |        v              v              v
                       |   +--------+    +--------+
                       |   | tools  |    |  END   |
                       |   |(工具)  |    +--------+
                       |   +---+----+
                       |       |
                       +-------+
                       (回到 agent)

[外部仓库]
PostgreSQL 数据库
  - checkpoints
  - checkpoint_writes
  - checkpoint_blobs
  - checkpoint_migrations

图的结构没有任何变化,只是右下角的"外部仓库"从内存字典换成了 PostgreSQL 数据库。

三、工作原理

1. 从 InMemorySaverPostgresSaver

InMemorySaverPostgresSaver 都实现了同一个接口:BaseCheckpointSaver。这意味着它们对图的业务代码完全透明——只要你在 workflow.compile(checkpointer=...) 里传入的实例实现了这个接口,图就能正常读写检查点。

# 第二章:内存检查点
checkpointer = InMemorySaver()
app = workflow.compile(checkpointer=checkpointer)

# 第三章:PostgreSQL 检查点(只换一行)
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup()
app = workflow.compile(checkpointer=checkpointer)

app.invoke(...) 的调用方式、config={"configurable": {"thread_id": "chapter-1"}} 的传参方式,完全不需要改。

2. 依赖安装与环境配置

PostgresSaver 来自 langgraph-checkpoint-postgres 包,psycopg 是 PostgreSQL 的 Python 3 驱动:

uv add psycopg langgraph-checkpoint-postgres

确认 .env 中有 PostgreSQL 连接字符串:

POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo

连接字符串的结构:

postgresql://  postgres  :  密码      @  localhost  :  5432  /  langgraph_demo
     |            |         |        |      |        |         |
  协议        用户名      密码      主机     端口     数据库名

3. 自动创建数据库

生产脚本不能假设数据库已经存在。我们在代码中加入一段初始化逻辑:

import psycopg
from urllib.parse import urlparse

DB_URI = os.environ.get("POSTGRES_URI")

parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"

with psycopg.connect(base_uri, autocommit=True) as init_conn:
    try:
        init_conn.execute(f"CREATE DATABASE {db_name};")
    except psycopg.errors.DuplicateDatabase:
        pass  # 数据库已存在,忽略

这里有两个细节:

  1. autocommit=TrueCREATE DATABASE 不能在事务块中运行。开启 autocommit 让这条命令独立执行。
  2. try/except DuplicateDatabaseCREATE DATABASE 没有 IF NOT EXISTS 语法,用异常捕获处理"已存在"的情况。

4. setup():自动建表

PostgresSaver 把检查点存储所需的表结构内建在代码里。调用 setup() 时,它会检查 checkpoint_migrations 表中的版本号,如果数据库还是空的,就执行一系列 CREATE TABLECREATE INDEX 语句。

一共会创建 4 张表

表名 作用
checkpoints 快照主表:每个会话在关键节点完成后的"完整状态照片"
checkpoint_writes 增量表:记录"哪个节点修改了哪个字段"
checkpoint_blobs 大对象表:存储消息列表等体积大的数据
checkpoint_migrations 迁移记录:框架内部追踪表结构版本

checkpoints——快照主表

CREATE TABLE checkpoints (
    thread_id         TEXT NOT NULL,
    checkpoint_ns     TEXT DEFAULT '',
    checkpoint_id     TEXT NOT NULL,
    parent_checkpoint_id TEXT,
    type              TEXT,
    checkpoint        JSONB,
    metadata          JSONB,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
  • thread_id:会话标识,同一个 thread_id 的所有检查点构成一条时间线。
  • checkpoint_id:快照的唯一 ID(UUID),每次图运行结束生成一个新快照。
  • parent_checkpoint_id:指向上一个快照的 ID。这个字段让检查点形成链表——你可以像 Git 回退一样,回到任意历史状态。
  • checkpoint:JSONB 格式的压缩状态摘要。

checkpoint_writes——增量记录表

记录"本轮运行中,每个节点对状态做了什么修改":

CREATE TABLE checkpoint_writes (
    thread_id      TEXT NOT NULL,
    checkpoint_ns  TEXT DEFAULT '',
    checkpoint_id  TEXT NOT NULL,
    task_id        TEXT NOT NULL,     -- 产生写入的节点
    idx            INT NOT NULL,
    channel        TEXT,              -- 修改了哪个状态字段
    type           TEXT,
    blob           BYTEA,             -- 实际写入的数据
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

LangGraph 不是每次存完整状态,而是记录增量。读取时取父快照 + 所有 writes = 当前完整状态,比全量序列化高效得多。

checkpoint_blobs——大对象分离表

消息列表可能很长,直接塞进 checkpoint_writes 会导致单行数据过大。这张表把大对象拆出来单独存储。

checkpoint_migrations——迁移版本表

框架内部使用,记录当前数据库 schema 的版本号,setup() 据此判断是否需要执行新的建表/改表脚本。

5. 为什么需要 autocommit=True

setup() 内部执行了 CREATE INDEX CONCURRENTLY 语句,这个命令不能在事务块中运行。如果 psycopg 连接默认开启事务模式,就会报错:

psycopg.errors.ActiveSqlTransaction: CREATE INDEX CONCURRENTLY 无法在事物块中运行

所以建立主连接时也需要 autocommit=True

6. get_tuple():读取检查点

如果想验证数据是否真的写入了数据库,可以调用 checkpointer.get_tuple(config)

checkpoint_tuple = checkpointer.get_tuple(
    {
   "configurable": {
   "thread_id": "chapter-1"}}
)

if checkpoint_tuple:
    config, checkpoint, metadata = checkpoint_tuple[0], checkpoint_tuple[1], checkpoint_tuple[2]
    print(f"最新检查点 ID: {checkpoint['id']}")
    print(f"步骤数: {metadata.get('step')}, 来源: {metadata.get('source')}")
    msgs = checkpoint.get('channel_values', {
   }).get('messages', [])
    print(f"已存储消息数: {len(msgs)}")

注意返回值是一个元组,不是列表:

索引 内容 含义
cp[0] config 字典 包含 thread_idcheckpoint_id 等配置信息
cp[1] checkpoint 字典 核心状态数据,含 channel_values
cp[2] metadata 字典 运行时元数据,如 stepsource
cp[3] parent_config 父检查点的配置(可选)
cp[4] pending_writes 待写入队列

7. 运行流程:持久化是如何发生的

当你调用 app.invoke(...) 时,LangGraph + PostgresSaver 的协作流程如下:

  1. 查档:用 thread_id 查询 checkpoints 表,找到最新快照。如果没有,从零开始。
  2. 恢复:读取快照的 checkpoint JSONB,反序列化出 messagestemperature_unit 等字段,注入到图的初始状态中。
  3. 运行:图开始执行,各节点返回增量写入内存状态。
  4. 存盘:运行结束后,LangGraph 把最终内存状态序列化,插入一条新的 checkpoints 记录。各节点的增量写入被记录到 checkpoint_writes
  5. 下一位玩家:下一次 invoke() 从这张新快照继续。

整个过程中,你的业务代码完全感知不到数据库的存在。这是存储层与业务层的彻底解耦

四、核心组件一览

组件 类 / 函数 / 常量 作用
PostgreSQL 驱动 psycopg Python 3 的 PostgreSQL 连接库
连接字符串解析 urllib.parse.urlparse POSTGRES_URI 中提取数据库名
自动建库 CREATE DATABASE + DuplicateDatabase 连接系统库 postgres,静默创建目标库
持久化检查点 PostgresSaver 将检查点存入 PostgreSQL,替代 InMemorySaver
建表初始化 PostgresSaver.setup() 自动创建四张检查点相关表
自动提交 autocommit=True 绕过事务限制,允许索引并发创建
检查点读取 get_tuple(config) 从数据库读取最新快照元组
状态归约 add_messages / 默认覆盖 与第二章完全一致,只是存储介质换成了数据库

五、试一试

前置条件

  1. 安装并启动 PostgreSQL(Windows 可用 EDB 安装包)。
  2. 记住安装时设置的 postgres 用户密码。
  3. 确认 .env 已配置:
OPENAI_API_KEY=sk-...
OPENAI_BASE_URL=https://api.moonshot.cn/v1  # 如果使用第三方服务
MODEL_ID=kimi-k2.5
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo

执行脚本

cd demo-01
python langgraph-03.py

你应该能看到和第二章相同的业务输出,但多了检查点验证信息:

=== 验证PostgreSQL检查点是否持久化写入 ===
最新检查点 ID: 1f142370-...
步骤数: 58, 来源: loop
已存储消息数: 48

验证持久化的方法

  1. 脚本多次运行:关掉终端,重新运行。步骤数会累加,因为每次运行都在同一个 thread_id 上追加。换成 thread_id="chapter-2",步骤数回到初始值。

  2. 直接查数据库

SELECT thread_id, checkpoint_id, metadata->>'step' AS step
FROM checkpoints
WHERE thread_id = 'chapter-1'
ORDER BY metadata->>'step' DESC
LIMIT 5;
  1. 跨进程验证:开两个终端,分别运行同一个脚本,使用同一个 thread_id。两个进程都能正确读取和写入同一组检查点。

完整代码

完整代码位于 demo-01/langgraph-03.py,与第二章相比只替换了检查点初始化部分。关键差异如下:

# ========== 第3章新增:PostgreSQL 持久化检查点 ==========

import psycopg
from urllib.parse import urlparse
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = os.environ.get("POSTGRES_URI")

# 自动创建数据库(如果不存在)
parsed = urlparse(DB_URI)
db_name = parsed.path.lstrip("/") if parsed.path else "langgraph_demo"
base_uri = DB_URI.rsplit("/", 1)[0] + "/postgres"

with psycopg.connect(base_uri, autocommit=True) as init_conn:
    try:
        init_conn.execute(f"CREATE DATABASE {db_name};")
    except psycopg.errors.DuplicateDatabase:
        pass  # 数据库已存在,忽略

# 替换 InMemorySaver 为 PostgresSaver
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)
checkpointer.setup()  # 自动创建 checkpoints/writes/blobs/migrations 表

app = workflow.compile(checkpointer=checkpointer)

# ========== 验证检查点 ==========

checkpoint_tuple = checkpointer.get_tuple(
    {
   "configurable": {
   "thread_id": "chapter-1"}}
)
if checkpoint_tuple:
    config, checkpoint, metadata = checkpoint_tuple[0], checkpoint_tuple[1], checkpoint_tuple[2]
    print(f"最新检查点 ID: {checkpoint['id']}")
    print(f"步骤数: {metadata.get('step')}, 来源: {metadata.get('source')}")
    msgs = checkpoint.get('channel_values', {
   }).get('messages', [])
    print(f"已存储消息数: {len(msgs)}")

其余代码(Statesearch 工具、call_modelshould_continue、图构建、三次调用实验)与第二章完全一致。

六、其他

为什么选择 PostgreSQL?

LangGraph 的检查点存储有多种实现:

实现 适用场景 特点
InMemorySaver 本地开发、快速原型 零配置,重启丢失
SqliteSaver 单机部署、轻量应用 文件级数据库,不需要独立服务
PostgresSaver 生产环境、多实例部署 网络数据库,支持并发读写
RedisSaver 高频写入、缓存优先 纯内存 + 持久化,速度极快

选择 PostgreSQL 的理由:

  • 成熟稳定: decades 的生产验证,运维生态丰富。
  • JSONB 原生支持:状态序列化/反序列化无需额外转换。
  • 并发安全:多进程、多实例同时读写同一个数据库,不会有数据竞争。
  • 可观测性强:用 SQL 就能直接查询、分析、导出会话记录。

生产注意事项

  1. 连接池:示例代码使用单个连接。生产环境应使用连接池(如 psycopg_pool),避免每次请求都新建连接。
  2. 定期清理checkpoints 表会持续增长,需要设计清理策略(如只保留最近 N 天的快照)。
  3. 备份:PostgreSQL 的 pg_dump 可以直接把检查点数据备份出来,灾难恢复非常方便。
  4. 索引优化setup() 已经创建了必要的索引,但如果你的 thread_id 前缀模式有规律,可能需要额外的复合索引。
目录
相关文章
|
24天前
|
JSON 前端开发 API
【LangGraph新手村系列】(1)LangGraph 入门:StateGraph 与带记忆的 ReAct 循环
介绍 LangGraph 核心思想:用 StateGraph 把单次 LLM 调用串成可循环的 ReAct 工作流。通过节点、边与公共状态黑板,实现模型思考、工具调用、条件跳转的闭环,并引入检查点让 Agent 拥有跨轮次记忆。
364 1
【LangGraph新手村系列】(1)LangGraph 入门:StateGraph 与带记忆的 ReAct 循环
|
24天前
|
IDE 数据可视化 安全
【LangGraph新手村系列】(2)自定义状态与归约器:让 LangGraph 记住更多东西
从 MessagesState 扩展到 TypedDict 自定义状态,用 Annotated 声明字段归约策略。messages 挂载 add_messages 实现追加合并,其他字段默认覆盖。节点函数可读取自定义字段,让 Agent 记住用户偏好与业务元数据。
170 1
|
16天前
|
SQL API 数据库
【LangGraph新手村系列】(4)人机协作中断:让 Agent 在关键节点停下来等你
解决"全自动Agent无法审查"的问题。在tools节点前插入interrupt_before中断点,改用stream流式运行实现暂停,通过get_state().next判断中断位置,审查工具调用后用Command(resume=True)恢复执行,实现人机协作的三重干预:审查放行、修改参数、拒绝执行。
132 0
|
1月前
|
安全 前端开发 Java
【SpringSecurity新手村系列】(1)初识安全框架
本文从零开始引入 Spring Security,演示默认登录页与接口保护效果,并解释认证、授权与过滤器链的基础机制,帮助你快速建立安全开发的整体认知。
159 1
|
1月前
|
人工智能 Java API
【SpringAIAlibaba新手村系列】(18)Agent 智能体与今日菜单应用
本章以 ReactAgent 为入口,将本地菜单工具与 MCP 外部工具合并注册,统一通过 /eatAgent 执行任务,展示 Agent 在多工具协同下的意图理解、工具调用与结果整合能力。
388 3
|
1月前
|
人工智能 JavaScript Java
【SpringAIAlibaba新手村系列】(10)Text to Voice 文本转语音技术
本文围绕 Spring AI Alibaba 1.1.2.2 的文本转语音实现展开,记录了基于 DashScopeAudioSpeechModel 与 stream() 的可运行方案。文章重点说明了模型、音色、输出格式与流式拼接音频文件的关键细节。
445 6
|
1月前
|
人工智能 JSON Java
【SpringAIAlibaba新手村系列】(7)结构化输出与对象映射
本文详解 Spring AI 结构化输出功能,通过 Java Record 与 .entity() 方法,实现 AI 的 JSON 响应自动映射为 Java 对象,解决纯文本难以集成的问题。文中还对比了 Lambda 写法并提供 Prompt 设计最佳实践。
390 4
|
1月前
|
人工智能 自然语言处理 前端开发
【SpringAIAlibaba新手村系列】(9)Text to Image 文本生成图像技术
本文介绍 Spring AI 中的文生图能力,围绕 ImageModel、ImagePrompt 与阿里云百炼图像模型展开,演示如何根据文字描述生成图片链接,并结合 Prompt 编写技巧与参数配置,帮助开发者提升生成效果与落地能力。
536 8
|
1月前
|
人工智能 Java 定位技术
【SpringAIAlibaba新手村系列】(16)调用百度 MCP 服务
本章展示如何在客户端接入第三方百度 MCP 服务。通过 spring-ai-starter-mcp-client、application.yml 与 mcp-server.json5 完成 stdio 方式连接,自动发现并注册远端工具到 ChatClient,实现天气、IP 归属地、路线规划等能力调用。
458 9
|
1月前
|
人工智能 Java API
【SpringAIAlibaba新手村系列】(5)Prompt 提示词基础与多种消息类型
本章详解Spring AI 1.1.2中Prompt核心机制:以System/User/Assistant/Tool四类消息构建结构化提示,强调“角色决定语义”;涵盖多模型配置、链式API与底层Message组装两种实践方式,并给出系统消息设计最佳实践。
637 7

热门文章

最新文章