描述
参考地址: https://docs.langchain.com/oss/python/langchain/human-in-the-loop
智能体执行过程中,有些危险方法,可以由人工确认之后,再执行。
官网给了非流式输出的实现;这里给出同步流式输出实现。
依赖
# 智能体依赖 pip install deepagents # openai模式大模型调用依赖 pip install langchain-openai
另外代码中依赖了两外两个工具类,参考如下:
日志模块:https://developer.aliyun.com/article/1736993?spm=a2c6h.13148508.setting.16.38b24f0efmXtnP
配置模块:https://developer.aliyun.com/article/1737001?spm=a2c6h.13148508.setting.15.38b24f0e1TT1EY
代码:DeepAgentManager
整体设计
1、类 DeepAgentManager 中有两个核心方法:stream_chat 和 stream_resume
2、流式输出的数据对象为json
类型 |
数据 |
示例 |
summarization |
自动摘要中... \n |
|
text |
聊天文本 |
{'type': 'text', 'content': ' for'} |
tool_call |
工具调用:*** \n |
{'type': 'tool_call', 'content': '工具调用:delete_file \n'} |
tool_output |
工具响应: ** \n |
{'type': 'tool_output', 'content': '工具响应: Deleted /temp.txt \n'} |
interrupt |
调用方法中断 |
{'type': 'interrupt', 'content': '{"action_requests": [{"name": "delete_file", "args": {"path": "/temp.txt"}, "description": "Tool execution requires approval\\n\\nTool: delete_file\\nArgs: {\'path\': \'/temp.txt\'}"}], "review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "edit", "reject", "respond"]}]}'} |
3、注意
一旦触发自动摘要,会有很多 summarization 消息返回,谨慎使用
所有content都是文本,如 interrupt 的content,使用的时候,需要转换为json对象
4、两个核心方法
方法名称 |
方法描述 |
stream_chat |
基本的聊天方法,流式返回消息 |
stream_resume |
中断重启方法,stream_chat中返回了interrupt消息,可以由该方法继续执行后续操作 |
工具demo
from langchain_core.tools import tool # 1. 定义工具(和原文一致) def delete_file(path: str) -> str: """Delete a file from the filesystem.""" return f"Deleted {path}" def read_file(path: str) -> str: """Read a file from the filesystem.""" return f"Contents of {path}" def send_email(to: str, subject: str, body: str) -> str: """Send an email.""" return f"Sent email to {to}" tools=[delete_file, read_file, send_email] interrupt_on={ "delete_file": True, "read_file": False, "send_email": {"allowed_decisions": ["approve", "reject"]}, }
详细代码
import json from typing import List from deepagents import create_deep_agent from deepagents.backends import StateBackend from deepagents.middleware import create_summarization_tool_middleware from langchain.chat_models import init_chat_model from langchain_core.messages import AIMessageChunk, ToolMessage, BaseMessage, SystemMessage, HumanMessage from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import Command from pydantic import BaseModel from manager.ConfigManager import config from manager.LoggerManager import logger from manager.ToolManager import tools, interrupt_on # 私域大语言模型配置 base_url = config.get("LLM").get("BASE_URL") api_key = config.get("LLM").get("APP_KEY") model_name = config.get("LLM").get("MODEL") # 配置大语言模型客户端,max_input_tokens 必须用以自摘要判定 custom_profile = { "max_input_tokens": 20_000, "tool_calling": True, "structured_output": True, } openai_client = init_chat_model( model_name, model_provider="openai", base_url=base_url, api_key=api_key, profile=custom_profile, max_tokens=20_000, ) # 定义DeepAgents的标准化输入 class DeepAgentInput(BaseModel): messages: List[BaseMessage] class DeepAgentManager: # 初始化方法 def __init__(self, tenant_id, session_id, sys_prompt): self.tenant_id = tenant_id self.session_id = session_id self.deep_agent = create_deep_agent( openai_client, tools=tools, system_prompt=sys_prompt, checkpointer=InMemorySaver(), middleware=[ create_summarization_tool_middleware(openai_client, StateBackend()) ], interrupt_on=interrupt_on, ) # 处理 messages 的chunk def _deal_messages_chunk(chunk): dtype, (token, metadata) = chunk logger.debug(f"token : {token}, metadata : {metadata}") # 结果数据 data_lst = [] if metadata.get("lc_source") == "summarization": logger.debug(f"summarization token : {token}, type : {type(token)}") data_lst.append({"type": "summarization", "content": "自动摘要中... \n"}) if isinstance(token, AIMessageChunk): text = [b for b in token.content_blocks if b["type"] == "text"] if text: data_lst.append({"type": "text", "content": text[0]["text"]}) elif isinstance(token, ToolMessage): data_lst.append({"type": "tool_call", "content": f"工具调用:{token.name} \n"}) data_lst.append({"type": "tool_output", "content": f"工具响应: {token.content} \n"}) else: logger.debug(f"token : {token}") logger.debug(f"metadata : {metadata}") return data_lst # 处理 updates 的 chunk def _deal_updates_chunk(chunk): dtype, token = chunk data_lst = [] interrupt_tuple = token.get("__interrupt__") if token else None if interrupt_tuple: interrupt_obj, = interrupt_tuple if interrupt_obj: # json 对象转换为json字符串 data_lst.append({"type": "interrupt", "content": json.dumps(interrupt_obj.value)}) return data_lst # 流式输出 def stream_chat(self, messages: List[BaseMessage]): try: thread_id = f"{self.tenant_id}-{self.session_id}" runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}} # input_data: Dict[str, list] = {"messages": messages} for chunk in self.deep_agent.stream( DeepAgentInput(messages=messages), stream_mode=["messages", "updates"], config=runnable_config, ): data_lst = None if "messages" in chunk: data_lst = self._deal_messages_chunk(chunk) elif "updates" in chunk: data_lst = self._deal_updates_chunk(chunk) if data_lst: for data in data_lst: yield data except Exception as e: logger.warning(f"Error: {str(e)}") # yield f'对不起,系统感冒了...' yield {"type": "text", "content": "对不起,系统感冒了..."} def stream_resume(self, decisions: list[dict]): try: thread_id = f"{self.tenant_id}-{self.session_id}" runnable_config: RunnableConfig = {"configurable": {"thread_id": thread_id}} for chunk in self.deep_agent.stream( Command(resume={"decisions": decisions}), stream_mode=["messages", "updates"], config=runnable_config, ): data_lst = None if "messages" in chunk: data_lst = self._deal_messages_chunk(chunk) if data_lst: for data in data_lst: yield data except Exception as e: logger.warning(f"Error: {str(e)}") # yield f'对不起,系统感冒了...' yield {"type": "text", "content": "对不起,系统感冒了..."}
示例
if __name__ == "__main__": # 创建DeepAgentManager实例 tid = "123" sid = "123" prompt = "You are a helpful assistant." deep_agent_manager = DeepAgentManager(tid, sid, prompt) print("DeepAgentManager initialized.") # 普通聊天 msgs = [ SystemMessage(content=prompt), HumanMessage(content="法国首都是哪儿") ] content1 = "" for response in deep_agent_manager.stream_chat(msgs): content1 += response.get("content", "") print(response) msgs.append(SystemMessage(content=content1)) print("------------------------------------------") interrupt_info = None # Delete the file temp.txt 删除文件操作 msgs.append(HumanMessage(content="Delete the file temp.txt")) content2 = "" for response in deep_agent_manager.stream_chat(msgs): ctype = response.get("type") if ctype == "text": content2 += response.get("content", "") elif ctype == "tool_call": content2 += response.get("content", "") elif ctype == "tool_output": content2 += response.get("content", "") elif ctype == "interrupt": interrupt_info = response.get("content", "") print(response) msgs.append(SystemMessage(content=content2)) print("-----------------------------------------") # 是否触发中断 if interrupt_info: interrupt_json = json.loads(interrupt_info) print(f"action_requests : {interrupt_json.get('action_requests')}") print(f"review_configs : {interrupt_json.get('review_configs')}") decisions_lst = [{"type": "approve"}] # 继续中断的聊天 for response in deep_agent_manager.stream_resume(decisions_lst): print(response) print("Chat completed.")