Python之代码片段-DeepAgents实现HITL

简介: 本项目实现LangChain中“人在环路”(Human-in-the-Loop)机制的同步流式输出,支持危险工具调用前人工确认(如delete_file),并按type(text/tool_call/tool_output/interrupt/summarization)结构化返回JSON流。依赖deepagents、langchain-openai等库,含stream_chat与stream_resume双核心方法。

描述

参考地址: https://docs.langchain.com/oss/python/langchain/human-in-the-loop

智能体执行过程中,有些危险方法,可以由人工确认之后,再执行。

官网给了非流式输出的实现;这里给出同步流式输出实现。

依赖

# 智能体依赖
pip install deepagents
# openai模式大模型调用依赖
pip install langchain-openai

另外代码中依赖了两外两个工具类,参考如下:

日志模块:https://developer.aliyun.com/article/1736993?spm=a2c6h.13148508.setting.16.38b24f0efmXtnP

配置模块:https://developer.aliyun.com/article/1737001?spm=a2c6h.13148508.setting.15.38b24f0e1TT1EY

代码:DeepAgentManager

整体设计

1、类 DeepAgentManager 中有两个核心方法:stream_chat 和 stream_resume

2、流式输出的数据对象为json

类型

数据

示例

summarization

自动摘要中... \n


text

聊天文本

{'type': 'text', 'content': ' for'}

tool_call

工具调用:*** \n

{'type': 'tool_call', 'content': '工具调用:delete_file \n'}

tool_output

工具响应: ** \n

{'type': 'tool_output', 'content': '工具响应: Deleted /temp.txt \n'}

interrupt

调用方法中断

{'type': 'interrupt', 'content': '{"action_requests": [{"name": "delete_file", "args": {"path": "/temp.txt"}, "description": "Tool execution requires approval\\n\\nTool: delete_file\\nArgs: {\'path\': \'/temp.txt\'}"}], "review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "edit", "reject", "respond"]}]}'}

3、注意

一旦触发自动摘要,会有很多 summarization 消息返回,谨慎使用

所有content都是文本,如 interrupt 的content,使用的时候,需要转换为json对象

4、两个核心方法

方法名称

方法描述

stream_chat

基本的聊天方法,流式返回消息

stream_resume

中断重启方法,stream_chat中返回了interrupt消息,可以由该方法继续执行后续操作

工具demo

from langchain_core.tools import tool

# 1. 定义工具(和原文一致)
@tool
def delete_file(path: str) -> str:
    """Delete a file from the filesystem."""
    return f"Deleted {path}"

@tool
def read_file(path: str) -> str:
    """Read a file from the filesystem."""
    return f"Contents of {path}"

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """Send an email."""
    return f"Sent email to {to}"

tools=[delete_file, read_file, send_email]

interrupt_on={
    "delete_file": True,
    "read_file": False,
    "send_email": {"allowed_decisions": ["approve", "reject"]},
}

详细代码

import json
from typing import List
from deepagents import create_deep_agent
from deepagents.backends import StateBackend
from deepagents.middleware import create_summarization_tool_middleware
from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessageChunk, ToolMessage, BaseMessage, SystemMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from pydantic import BaseModel

from manager.ConfigManager import config
from manager.LoggerManager import logger
from manager.ToolManager import tools, interrupt_on


# 私域大语言模型配置
base_url = config.get("LLM").get("BASE_URL")
api_key = config.get("LLM").get("APP_KEY")
model_name = config.get("LLM").get("MODEL")


# 配置大语言模型客户端,max_input_tokens 必须用以自摘要判定
custom_profile = {
    "max_input_tokens": 20_000,
    "tool_calling": True,
    "structured_output": True,
}
openai_client = init_chat_model(
    model_name,
    model_provider="openai",
    base_url=base_url,
    api_key=api_key,
    profile=custom_profile,
    max_tokens=20_000,
)


# 定义DeepAgents的标准化输入
class DeepAgentInput(BaseModel):
    messages: List[BaseMessage]


class DeepAgentManager:
    # 初始化方法
    def __init__(self, tenant_id, session_id, sys_prompt):
        self.tenant_id = tenant_id
        self.session_id = session_id
        self.deep_agent = create_deep_agent(
            openai_client,
            tools=tools,
            system_prompt=sys_prompt,
            checkpointer=InMemorySaver(),
            middleware=[
                create_summarization_tool_middleware(openai_client, StateBackend())
            ],
            interrupt_on=interrupt_on,
        )

    # 处理 messages 的chunk
    @staticmethod
    def _deal_messages_chunk(chunk):
        dtype, (token, metadata) = chunk
        logger.debug(f"token : {token}, metadata : {metadata}")
        # 结果数据
        data_lst = []
        if metadata.get("lc_source") == "summarization":
            logger.debug(f"summarization token : {token}, type : {type(token)}")
            data_lst.append({"type": "summarization", "content": "自动摘要中... \n"})
        if isinstance(token, AIMessageChunk):
            text = [b for b in token.content_blocks if b["type"] == "text"]
            if text:
                data_lst.append({"type": "text", "content": text[0]["text"]})
        elif isinstance(token, ToolMessage):
            data_lst.append({"type": "tool_call", "content": f"工具调用:{token.name} \n"})
            data_lst.append({"type": "tool_output", "content": f"工具响应: {token.content} \n"})
        else:
            logger.debug(f"token : {token}")
            logger.debug(f"metadata : {metadata}")
        return data_lst

    # 处理 updates 的 chunk
    @staticmethod
    def _deal_updates_chunk(chunk):
        dtype, token = chunk
        data_lst = []
        interrupt_tuple = token.get("__interrupt__") if token else None
        if interrupt_tuple:
            interrupt_obj, = interrupt_tuple
            if interrupt_obj:
                # json 对象转换为json字符串
                data_lst.append({"type": "interrupt", "content": json.dumps(interrupt_obj.value)})
        return data_lst

    # 流式输出
    def stream_chat(self, messages: List[BaseMessage]):
        try:
            thread_id = f"{self.tenant_id}-{self.session_id}"
            runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
            # input_data: Dict[str, list] = {"messages": messages}
            for chunk in self.deep_agent.stream(
                DeepAgentInput(messages=messages),
                stream_mode=["messages", "updates"],
                config=runnable_config,
            ):
                data_lst = None
                if "messages" in chunk:
                    data_lst = self._deal_messages_chunk(chunk)
                elif "updates" in chunk:
                    data_lst = self._deal_updates_chunk(chunk)
                if data_lst:
                    for data in data_lst:
                        yield data
        except Exception as e:
            logger.warning(f"Error: {str(e)}")
            # yield f'对不起,系统感冒了...'
            yield {"type": "text", "content": "对不起,系统感冒了..."}

    def stream_resume(self, decisions: list[dict]):
        try:
            thread_id = f"{self.tenant_id}-{self.session_id}"
            runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
            for chunk in self.deep_agent.stream(
                Command(resume={"decisions": decisions}),
                stream_mode=["messages", "updates"],
                config=runnable_config,
            ):
                data_lst = None
                if "messages" in chunk:
                    data_lst = self._deal_messages_chunk(chunk)
                if data_lst:
                    for data in data_lst:
                        yield data
        except Exception as e:
            logger.warning(f"Error: {str(e)}")
            # yield f'对不起,系统感冒了...'
            yield {"type": "text", "content": "对不起,系统感冒了..."}

示例

if __name__ == "__main__":
    # 创建DeepAgentManager实例
    tid = "123"
    sid = "123"
    prompt = "You are a helpful assistant."
    deep_agent_manager = DeepAgentManager(tid, sid, prompt)

    print("DeepAgentManager initialized.")

    # 普通聊天
    msgs = [
        SystemMessage(content=prompt),
        HumanMessage(content="法国首都是哪儿")
    ]
    content1 = ""
    for response in deep_agent_manager.stream_chat(msgs):
        content1 += response.get("content", "")
        print(response)
    msgs.append(SystemMessage(content=content1))

    print("------------------------------------------")

    interrupt_info = None

    # Delete the file temp.txt 删除文件操作
    msgs.append(HumanMessage(content="Delete the file temp.txt"))
    content2 = ""
    for response in deep_agent_manager.stream_chat(msgs):
        ctype = response.get("type")
        if ctype == "text":
            content2 += response.get("content", "")
        elif ctype == "tool_call":
            content2 += response.get("content", "")
        elif ctype == "tool_output":
            content2 += response.get("content", "")
        elif ctype == "interrupt":
            interrupt_info = response.get("content", "")
        print(response)
    msgs.append(SystemMessage(content=content2))

    print("-----------------------------------------")

    # 是否触发中断
    if interrupt_info:
        interrupt_json = json.loads(interrupt_info)
        print(f"action_requests : {interrupt_json.get('action_requests')}")
        print(f"review_configs : {interrupt_json.get('review_configs')}")
        decisions_lst = [{"type": "approve"}]

        # 继续中断的聊天
        for response in deep_agent_manager.stream_resume(decisions_lst):
            print(response)

    print("Chat completed.")


相关文章
|
Arthas 监控 Java
Jvm性能调优+监控工具Arthas【阿里开源】
Jvm性能调优+监控工具Arthas【阿里开源】
1495 0
|
20天前
|
运维 Java 应用服务中间件
linux安装 apache-tomcat-7.0.42.tar.gz 详细步骤(解压、配置、启动)
本指南详解CentOS 7下部署Tomcat 7.0.42:含JDK 7安装配置、Tomcat解压、启动/关闭、WAR包部署及8080端口访问测试;附JAVA_HOME设置、防火墙放行、启动加速等常见问题解决方案,专为老项目稳定运维设计。(239字)
|
4月前
|
Java 调度 开发者
Java AQS:JUC 并发体系的底层同步框架基石
AQS(AbstractQueuedSynchronizer)是Java并发包(JUC)的底层核心,以volatile state + CLH双向队列统一实现同步控制。支持独占(如ReentrantLock)与共享(如Semaphore、CountDownLatch)两种模式,通过模板方法封装排队、阻塞/唤醒等通用逻辑,是理解与定制高性能同步组件的关键基石。(239字)
581 7
|
20天前
|
SQL 安全 数据库
WordPress批量替换文章数据库内容教程
本文介绍如何安全批量替换WordPress数据库内容(如域名、文字),含三步操作:1. 登录phpMyAdmin选择数据库;2. 在SQL窗口执行UPDATE语句;3. 执行并验证结果。**务必先备份数据库!** 注意核对表名(如wp_posts)和字段名(如post_content)是否匹配。
150 3
|
20天前
|
人工智能 供应链 Cloud Native
2026中国B2B企业服务业GEO白皮书:从产业洞察到优化实践
78%的企业HR、法务、采购已通过AI筛选服务商。本白皮书覆盖法律服务、管理咨询、金融科技、人力资源、IT服务等10大核心赛道,通过DSS原则(语义深度、数据支持、权威来源),将过会率、交付周期、客户评价等转化为可验证的AI信任资产,助力专业服务机构实现AI获客。
471 2
|
20天前
|
人工智能 测试技术 调度
《通用主控Skill开发指南:从意图分解到容错执行》
本文针对当前AI技能生态普遍存在的孤岛化痛点,系统阐述了主控Skill的完整开发与落地实践。文章指出主控Skill并非简单的静态脚本串联,而是基于意图驱动的动态编排架构,详细拆解了意图分解、标准化技能元数据体系、动态编排引擎、统一上下文管理、跨协议调用网关及容错机制等核心模块的设计思路。通过旅行规划等典型场景验证,该架构可实现复杂任务的端到端自动化,大幅降低AI使用门槛。
141 2
|
20天前
|
数据采集 存储 安全
跨境系统安全加固:接口防刷、数据加密、订单风控全方案
本文针对跨境反向海淘系统高危公开接口多、敏感数据多、安全防护薄弱等问题,基于Laravel实战,提出轻量化、可落地的四维安全加固方案:IP+接口级限流防刷、敏感字段对称加密存储、多维度智能交易风控、全链路操作日志审计,兼顾合规性与业务稳定性。
154 1
|
20天前
|
人工智能 运维 监控
Kali365 钓鱼即服务平台攻击机理与 Microsoft 365 OAuth 令牌劫持防御体系研究
FBI预警新型PhaaS平台Kali365:利用微软OAuth设备码流程绕过MFA,通过Telegram分发,诱导用户在官方页面输入验证码窃取长期有效令牌。本文剖析其攻击链、协议滥用机理,提出含策略阻断、令牌管控、行为检测的闭环防御方案,并提供可落地代码与配置。(239字)
138 0
|
20天前
|
人工智能 前端开发 API
JBoltAI v4.4:ReAct推理链从黑盒走向全透明
JBoltAI v4.4聚焦ReAct Agent企业级落地,通过抽象推理基座(AbstractReActChain)、深度整合向量空间、解耦工具调用,实现Thought/Action/Observation全链路结构化、可追溯、可审计,让黑盒推理走向透明可控。(239字)