Python之DeepAgents基于mongodb的检查点

简介: 本文介绍了基于MongoDB实现LangGraph检查点的完整方案:提供同步/异步的CheckpointSaver类,支持线程隔离、索引优化与序列化;包含连接配置、实例化及DeepAgent集成示例,并附带按thread_id查询历史对话消息的实用工具函数。

MongoDB检查点

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Iterator, AsyncIterator

from bson import Binary
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
    ChannelVersions,
)
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient


class MongoDBCheckpointSaver(BaseCheckpointSaver):
    def __init__(
        self,
        uri: str = "mongodb://localhost:27017/",
        db_name: str = "langgraph",
        collection_name: str = "checkpoints"
    ):
        super().__init__()
        self.client = MongoClient(uri)
        self.db = self.client[db_name]
        self.collection = self.db[collection_name]

        self.async_client = AsyncIOMotorClient(uri)
        self.async_db = self.async_client[db_name]
        self.async_collection = self.async_db[collection_name]

        self.collection.create_index("thread_id")
        self.collection.create_index([("thread_id", 1), ("checkpoint_id", 1)], unique=True)

    # -------------------------- 真正源码级正确 --------------------------
    def _serialize(self, obj: Any) -> dict:
        # dumps_typed 返回 (type: str, data: bytes)
        typ, data = self.serde.dumps_typed(obj)
        return {"type": typ, "data": Binary(data)}

    def _deserialize(self, data: dict) -> Any:
        return self.serde.loads_typed((data["type"], data["data"]))

    # ------------------------------ 同步方法 ------------------------------
    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        thread_id = config["configurable"]["thread_id"]
        doc = self.collection.find_one({"thread_id": thread_id}, sort=[("checkpoint_id", -1)])
        if not doc:
            return None

        return CheckpointTuple(
            config=config,
            checkpoint=self._deserialize(doc["checkpoint"]),
            metadata=self._deserialize(doc["metadata"]),
            parent_config=None,
        )

    def list(
        self,
        config: RunnableConfig | None,
        **kwargs: Any
    ) -> Iterator[CheckpointTuple]:
        query = {}
        if config and "configurable" in config:
            thread_id = config["configurable"].get("thread_id")
            if thread_id:
                query["thread_id"] = thread_id

        for doc in self.collection.find(query, sort=[("checkpoint_id", -1)]):
            yield CheckpointTuple(
                config={"configurable": {"thread_id": doc["thread_id"]}},
                checkpoint=self._deserialize(doc["checkpoint"]),
                metadata=self._deserialize(doc["metadata"]),
            )

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = checkpoint["id"]

        self.collection.update_one(
            {"thread_id": thread_id, "checkpoint_id": checkpoint_id},
            {
                "$set": {
                    "thread_id": thread_id,
                    "checkpoint_id": checkpoint_id,
                    "checkpoint": self._serialize(checkpoint),
                    "metadata": self._serialize(metadata),
                    "updated_at": datetime.now(timezone.utc),
                }
            },
            upsert=True,
        )
        return config

    def put_writes(self, *args, **kwargs):
        pass

    # ------------------------------ 异步方法 ------------------------------
    async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        thread_id = config["configurable"]["thread_id"]
        doc = await self.async_collection.find_one(
            {"thread_id": thread_id}, sort=[("checkpoint_id", -1)]
        )
        if not doc:
            return None

        return CheckpointTuple(
            config=config,
            checkpoint=self._deserialize(doc["checkpoint"]),
            metadata=self._deserialize(doc["metadata"]),
        )

    async def alist(
        self, config: RunnableConfig | None, **kwargs: Any
    ) -> AsyncIterator[CheckpointTuple]:
        query = {}
        if config and "configurable" in config:
            thread_id = config["configurable"].get("thread_id")
            if thread_id:
                query["thread_id"] = thread_id

        cursor = self.async_collection.find(query).sort("checkpoint_id", -1)
        async for doc in cursor:
            yield CheckpointTuple(
                config={"configurable": {"thread_id": doc["thread_id"]}},
                checkpoint=self._deserialize(doc["checkpoint"]),
                metadata=self._deserialize(doc["metadata"]),
            )

    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = checkpoint["id"]

        await self.async_collection.update_one(
            {"thread_id": thread_id, "checkpoint_id": checkpoint_id},
            {
                "$set": {
                    "thread_id": thread_id,
                    "checkpoint_id": checkpoint_id,
                    "checkpoint": self._serialize(checkpoint),
                    "metadata": self._serialize(metadata),
                    "updated_at": datetime.now(timezone.utc),
                }
            },
            upsert=True,
        )
        return config

    async def aput_writes(self, *args, **kwargs):
        pass

MongoDB连接配置

import urllib.parse

from util.ConfigManager import config


mongo_ip = config.get("MONGODB").get("IP")
mongo_port = config.get("MONGODB").get("PORT")
mongo_user = config.get("MONGODB").get("USER")
mongo_pwd = config.get("MONGODB").get("PWD")
mongo_db = config.get("MONGODB").get("DB")
mongo_collection = config.get("MONGODB").get("COLLECTION")


def build_connection_string() -> str:
    if mongo_user and mongo_pwd:
        username = urllib.parse.quote_plus(mongo_user)
        password = urllib.parse.quote_plus(mongo_pwd)
        print(f"mongodb://{username}:{password}@{mongo_ip}:{mongo_port}")
        return f"mongodb://{username}:{password}@{mongo_ip}:{mongo_port}"
    else:
        return f"mongodb://{mongo_ip}:{mongo_port}"

检查点实例化

# 实例化一个全局的 MongoDBCheckpointSaver
mongo_db_checkpoint_saver = MongoDBCheckpointSaver(
    uri=build_connection_string(),
    db_name=mongo_db,
    collection_name=mongo_collection
)

创建DeepAgents

session_key = f"{tenant_id}_{session_id}"

if session_key in cls._instances:
    return cls._instances[session_key]

# 设置权限
permissions = [
    FilesystemPermission(
        operations=["read", "write"],
        paths=[f"/workspace/{tenant_id}/{session_id}/**"],
        mode="allow"
    )
]
# 设置后端存储,按租户和会话隔离
backend = CompositeBackend(
    default=StateBackend(),
    routes={
        f"/memories/{tenant_id}/{session_id}/": FilesystemBackend(
            root_dir=f"./memories/{tenant_id}/{session_id}",
            virtual_mode=True
        ),
        f"/workspace/{tenant_id}/{session_id}/": FilesystemBackend(
            root_dir=f"./workspace/{tenant_id}/{session_id}",
            virtual_mode=True
        )
    }
)

# 使用默认系统提示词或自定义提示词
sys_prompt = system_prompt or "You are a research assistant."

# 合并工具列表
tools = custom_tools if custom_tools else [current_data_quality]

# 创建DeepAgent实例
agent = create_deep_agent(
    model=open_ai_client,
    system_prompt=sys_prompt,
    middleware=[
        log_tool_calls
    ],
    tools=tools,
    permissions=permissions,
    backend=backend,
    checkpointer=mongo_db_checkpoint_saver
)

根据回话ID检索对话列表

from bson import Binary
from pymongo import MongoClient
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

from util.MongoPool import build_connection_string, mongo_db

# 固定序列化器
_SERDE = JsonPlusSerializer()


def decode_typed_item(item: dict) -> dict:
    typ = item["type"]
    data = item["data"]
    return _SERDE.loads_typed((typ, data))


def query_chat_messages(
    thread_id: str,
    mongo_uri="mongodb://localhost:27017/",
    db_name="langgraph",
    collection_name="checkpoints"
):
    client = MongoClient(mongo_uri)
    coll = client[db_name][collection_name]

    # 查最新 checkpoint
    doc = coll.find_one({"thread_id": thread_id}, sort=[("checkpoint_id", -1)])
    if not doc:
        return []

    # 解析
    checkpoint = decode_typed_item(doc["checkpoint"])
    messages = checkpoint.get("channel_values", {}).get("messages", [])

    # 格式化:把 LangChain 消息对象 转成 普通字典
    result = []
    for msg in messages:
        try:
            # 消息对象 → 字典
            msg_dict = msg.model_dump()
            # print("-------------------------------------------------")
            # print(msg_dict)
            # print("-------------------------------------------------")
            role = msg_dict.get("type", "ai")
            content = msg_dict.get("content", "")
            result.append({"role": role, "content": content})
        except Exception as e:
            continue

    return result


# ============ 直接使用 ============
if __name__ == "__main__":
    mongodb_url = build_connection_string()
    db_name = mongo_db
    # 替换成你的 thread_id
    thread_id = "sdrowero234249jdoisjfoiet93xxxxx"

    msgs = query_chat_messages(
        thread_id=thread_id,
        mongo_uri=mongodb_url,
        db_name=db_name
    )
    for item in msgs:
        print(f"【{item['role']}】\n{item['content']}\n")
相关文章
|
2天前
|
人工智能 API 开发工具
Claude Code国内安装:2026最新保姆教程(附cc-switch配置)
Claude Code是我目前最推荐的AI编程工具,没有之一。 它可能不是最简单的,但绝对是上限最高的。一旦跑通安装、接上模型、定好规范,你会发现很多原本需要几小时的工作,现在几分钟就能搞定。 这套方案的核心优势就三个字:可控性。你不用依赖任何不稳定服务,所有组件都在自己手里。模型效果不好?换一个。框架更新了?自己决定升不升。 这才是AI时代开发者该有的姿势——不是被动等喂饭,而是主动搭建自己的生产力基础设施。 希望这篇保姆教程,能帮你顺利上车。做出你自己的作品。
Claude Code国内安装:2026最新保姆教程(附cc-switch配置)
|
9天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
3865 21
|
5天前
|
人工智能 JSON BI
DeepSeek V4 来了!超越 Claude Sonnet 4.5,赶紧对接 Claude Code 体验一把
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro 的真实体验与避坑记录 本文记录我将 Claude Code 对接 DeepSeek 最新模型(V4Pro)后的真实体验,测试了 Skills 自动化查询和积木报表 AI 建表两个场景——有惊喜,也踩
2463 8
|
4天前
|
人工智能 缓存 BI
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro,跑完 Skills —— OA 审批、大屏、报表、部署 5 大实战场景后的真实体验 ![](https://oscimg.oschina.net/oscnet/up608d34aeb6bafc47f
2083 4
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
|
21天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
18991 60
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
2天前
|
SQL 人工智能 弹性计算
阿里云发布 Agentic NDR,威胁检测与响应进入智能体时代
欢迎前往阿里云云防火墙控制台体验!
1170 2

热门文章

最新文章