Python之代码片段-DeepAgents实现HITL

简介: 本项目实现LangChain中“人在环路”(Human-in-the-Loop)机制的同步流式输出,支持危险工具调用前人工确认(如delete_file),并按type(text/tool_call/tool_output/interrupt/summarization)结构化返回JSON流。依赖deepagents、langchain-openai等库,含stream_chat与stream_resume双核心方法。

描述

参考地址: https://docs.langchain.com/oss/python/langchain/human-in-the-loop

智能体执行过程中,有些危险方法,可以由人工确认之后,再执行。

官网给了非流式输出的实现;这里给出同步流式输出实现。

依赖

# 智能体依赖
pip install deepagents
# openai模式大模型调用依赖
pip install langchain-openai

另外代码中依赖了两外两个工具类,参考如下:

日志模块:https://developer.aliyun.com/article/1736993?spm=a2c6h.13148508.setting.16.38b24f0efmXtnP

配置模块:https://developer.aliyun.com/article/1737001?spm=a2c6h.13148508.setting.15.38b24f0e1TT1EY

代码:DeepAgentManager

整体设计

1、类 DeepAgentManager 中有两个核心方法:stream_chat 和 stream_resume

2、流式输出的数据对象为json

类型

数据

示例

summarization

自动摘要中... \n


text

聊天文本

{'type': 'text', 'content': ' for'}

tool_call

工具调用:*** \n

{'type': 'tool_call', 'content': '工具调用:delete_file \n'}

tool_output

工具响应: ** \n

{'type': 'tool_output', 'content': '工具响应: Deleted /temp.txt \n'}

interrupt

调用方法中断

{'type': 'interrupt', 'content': '{"action_requests": [{"name": "delete_file", "args": {"path": "/temp.txt"}, "description": "Tool execution requires approval\\n\\nTool: delete_file\\nArgs: {\'path\': \'/temp.txt\'}"}], "review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "edit", "reject", "respond"]}]}'}

3、注意

一旦触发自动摘要,会有很多 summarization 消息返回,谨慎使用

所有content都是文本,如 interrupt 的content,使用的时候,需要转换为json对象

4、两个核心方法

方法名称

方法描述

stream_chat

基本的聊天方法,流式返回消息

stream_resume

中断重启方法,stream_chat中返回了interrupt消息,可以由该方法继续执行后续操作

工具demo

from langchain_core.tools import tool

# 1. 定义工具(和原文一致)
@tool
def delete_file(path: str) -> str:
    """Delete a file from the filesystem."""
    return f"Deleted {path}"

@tool
def read_file(path: str) -> str:
    """Read a file from the filesystem."""
    return f"Contents of {path}"

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """Send an email."""
    return f"Sent email to {to}"

tools=[delete_file, read_file, send_email]

interrupt_on={
    "delete_file": True,
    "read_file": False,
    "send_email": {"allowed_decisions": ["approve", "reject"]},
}

详细代码

import json
from typing import List
from deepagents import create_deep_agent
from deepagents.backends import StateBackend
from deepagents.middleware import create_summarization_tool_middleware
from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessageChunk, ToolMessage, BaseMessage, SystemMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from pydantic import BaseModel

from manager.ConfigManager import config
from manager.LoggerManager import logger
from manager.ToolManager import tools, interrupt_on


# 私域大语言模型配置
base_url = config.get("LLM").get("BASE_URL")
api_key = config.get("LLM").get("APP_KEY")
model_name = config.get("LLM").get("MODEL")


# 配置大语言模型客户端,max_input_tokens 必须用以自摘要判定
custom_profile = {
    "max_input_tokens": 20_000,
    "tool_calling": True,
    "structured_output": True,
}
openai_client = init_chat_model(
    model_name,
    model_provider="openai",
    base_url=base_url,
    api_key=api_key,
    profile=custom_profile,
    max_tokens=20_000,
)


# 定义DeepAgents的标准化输入
class DeepAgentInput(BaseModel):
    messages: List[BaseMessage]


class DeepAgentManager:
    # 初始化方法
    def __init__(self, tenant_id, session_id, sys_prompt):
        self.tenant_id = tenant_id
        self.session_id = session_id
        self.deep_agent = create_deep_agent(
            openai_client,
            tools=tools,
            system_prompt=sys_prompt,
            checkpointer=InMemorySaver(),
            middleware=[
                create_summarization_tool_middleware(openai_client, StateBackend())
            ],
            interrupt_on=interrupt_on,
        )

    # 处理 messages 的chunk
    @staticmethod
    def _deal_messages_chunk(chunk):
        dtype, (token, metadata) = chunk
        logger.debug(f"token : {token}, metadata : {metadata}")
        # 结果数据
        data_lst = []
        if metadata.get("lc_source") == "summarization":
            logger.debug(f"summarization token : {token}, type : {type(token)}")
            data_lst.append({"type": "summarization", "content": "自动摘要中... \n"})
        if isinstance(token, AIMessageChunk):
            text = [b for b in token.content_blocks if b["type"] == "text"]
            if text:
                data_lst.append({"type": "text", "content": text[0]["text"]})
        elif isinstance(token, ToolMessage):
            data_lst.append({"type": "tool_call", "content": f"工具调用:{token.name} \n"})
            data_lst.append({"type": "tool_output", "content": f"工具响应: {token.content} \n"})
        else:
            logger.debug(f"token : {token}")
            logger.debug(f"metadata : {metadata}")
        return data_lst

    # 处理 updates 的 chunk
    @staticmethod
    def _deal_updates_chunk(chunk):
        dtype, token = chunk
        data_lst = []
        interrupt_tuple = token.get("__interrupt__") if token else None
        if interrupt_tuple:
            interrupt_obj, = interrupt_tuple
            if interrupt_obj:
                # json 对象转换为json字符串
                data_lst.append({"type": "interrupt", "content": json.dumps(interrupt_obj.value)})
        return data_lst

    # 流式输出
    def stream_chat(self, messages: List[BaseMessage]):
        try:
            thread_id = f"{self.tenant_id}-{self.session_id}"
            runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
            # input_data: Dict[str, list] = {"messages": messages}
            for chunk in self.deep_agent.stream(
                DeepAgentInput(messages=messages),
                stream_mode=["messages", "updates"],
                config=runnable_config,
            ):
                data_lst = None
                if "messages" in chunk:
                    data_lst = self._deal_messages_chunk(chunk)
                elif "updates" in chunk:
                    data_lst = self._deal_updates_chunk(chunk)
                if data_lst:
                    for data in data_lst:
                        yield data
        except Exception as e:
            logger.warning(f"Error: {str(e)}")
            # yield f'对不起,系统感冒了...'
            yield {"type": "text", "content": "对不起,系统感冒了..."}

    def stream_resume(self, decisions: list[dict]):
        try:
            thread_id = f"{self.tenant_id}-{self.session_id}"
            runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
            for chunk in self.deep_agent.stream(
                Command(resume={"decisions": decisions}),
                stream_mode=["messages", "updates"],
                config=runnable_config,
            ):
                data_lst = None
                if "messages" in chunk:
                    data_lst = self._deal_messages_chunk(chunk)
                if data_lst:
                    for data in data_lst:
                        yield data
        except Exception as e:
            logger.warning(f"Error: {str(e)}")
            # yield f'对不起,系统感冒了...'
            yield {"type": "text", "content": "对不起,系统感冒了..."}

示例

if __name__ == "__main__":
    # 创建DeepAgentManager实例
    tid = "123"
    sid = "123"
    prompt = "You are a helpful assistant."
    deep_agent_manager = DeepAgentManager(tid, sid, prompt)

    print("DeepAgentManager initialized.")

    # 普通聊天
    msgs = [
        SystemMessage(content=prompt),
        HumanMessage(content="法国首都是哪儿")
    ]
    content1 = ""
    for response in deep_agent_manager.stream_chat(msgs):
        content1 += response.get("content", "")
        print(response)
    msgs.append(SystemMessage(content=content1))

    print("------------------------------------------")

    interrupt_info = None

    # Delete the file temp.txt 删除文件操作
    msgs.append(HumanMessage(content="Delete the file temp.txt"))
    content2 = ""
    for response in deep_agent_manager.stream_chat(msgs):
        ctype = response.get("type")
        if ctype == "text":
            content2 += response.get("content", "")
        elif ctype == "tool_call":
            content2 += response.get("content", "")
        elif ctype == "tool_output":
            content2 += response.get("content", "")
        elif ctype == "interrupt":
            interrupt_info = response.get("content", "")
        print(response)
    msgs.append(SystemMessage(content=content2))

    print("-----------------------------------------")

    # 是否触发中断
    if interrupt_info:
        interrupt_json = json.loads(interrupt_info)
        print(f"action_requests : {interrupt_json.get('action_requests')}")
        print(f"review_configs : {interrupt_json.get('review_configs')}")
        decisions_lst = [{"type": "approve"}]

        # 继续中断的聊天
        for response in deep_agent_manager.stream_resume(decisions_lst):
            print(response)

    print("Chat completed.")


相关文章
|
5天前
|
人工智能 自然语言处理 文字识别
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
Qwen3.7-Max是阿里云百炼面向智能体时代推出的新一代旗舰模型,对标GPT-5.5、Claude Opus 4.7等闭源旗舰。该模型支持百万级token上下文窗口,具备顶级推理能力、多模态搜索与视觉理解增强、流式输出低延迟响应等核心优势,覆盖编程、办公、长周期自主执行等复杂场景。同时支持OpenAI接口兼容,便于系统快速迁移。用户可通过Token Plan团队或节省计划等订阅方式灵活调用,适合企业级高要求场景使用。
2712 9
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
|
13天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
3455 12
|
16天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
3532 25
|
9天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
2667 6
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
7天前
|
人工智能 自然语言处理 供应链
|
7天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全+三种模式+记忆体系+实战工作流完整手册
Claude Code 是当前最流行的终端级 AI 编程助手,能够直接在命令行中完成代码生成、项目理解、文件修改、命令执行、错误修复等全流程开发工作。它不依赖图形界面、不占用额外资源,却能深度理解项目结构,自动生成规范代码,大幅提升研发效率。
1233 3
|
28天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23612 15
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」