AgentRun 通过集成表格存储(Tablestore),为智能体提供三种持久化记忆能力:在同一对话中维持上下文的会话历史、跨会话保留用户偏好等结构化 信息的长期记忆,以及可直接读写的会话状态。本文介绍如何创建并配置记忆存储,并通过可运行的代码示例演示三种记忆类型的使用方式。
记忆类型
类型 |
作用 |
触发方式 |
跨会话 |
Tablestore 表 |
会话历史 |
记录同一 session 内的完整对话轮次,维持多轮对话上下文 |
每次请求自动写入,传入相同 |
否 |
|
长期记忆 |
从对话中提炼用户偏好等结构化信息,以向量形式持久化,让 Agent 随使用逐渐了解用户 |
Agent 调用 MCP 工具自动提取并存储(需开启 MCP) |
是 |
|
会话状态 |
存储每次对话的元数据,如任务进度、用户设置等 |
仅支持通过 SDK 显式读写,AgentRun控制台快速创建的Agent不支持会话状态 |
否 |
|
快速使用
步骤一:创建记忆存储
- 登录函数计算控制台,在左侧菜单栏单击**函数智能** > 智能体 AgentRun,进入 AgentRun 控制台。
重要:如果进入 AgentRun 控制台后,右上角有“体验新版”字样,单击切换到新版控制台。 - 在左侧菜单栏单击记忆,然后单击添加记忆存储,填写相关配置项:
配置项 |
说明 |
名称 |
必填,系统自动生成(如 |
向量数据库 |
默认自动配置(推荐),自动创建并关联 Tablestore 向量数据库 |
长期记忆 |
默认开启,需选择大语言模型服务、向量模型服务和执行角色 |
会话历史 |
默认开启 |
会话状态 |
默认开启 |
网络配置 |
默认允许公网访问;也可配置 VPC 访问;至少选择一种 |
- 单击开始部署,等待记忆存储部署完成。
步骤二:创建 Agent 并体验记忆功能
- 在左侧菜单栏单击 Agent 运行时,然后单击创建 Agent,选择快速创建,填写相关配置项:
配置项 |
说明 |
Agent 名称 |
必填,系统自动生成(如 |
模型服务 |
选择已配置的模型,如 |
系统提示词 |
可选择模板,如“智能问答助手” |
工具与上下文 > 记忆 |
添加步骤一创建的记忆存储 说明 使用长期记忆功能时,需按页面提示开启 MCP,Agent 才能自动提取和读取长期记忆。 |
访问凭证 |
默认匿名访问,可选择已有凭证或创建新凭证 |
- 单击创建 Agent,等待 Agent 启动完成。
- 在模型对比测试窗口中:
- 输入“我叫小明,最近在学习向量数据库”,观察 Agent 回复
说明:函数计算遵循Serverless(无服务器)架构,只有在请求到达时才创建实例,并能及时释放实例帮助用户节省成本。因此,首次提问时,会有较长的等待时间。
- 在同一对话中继续提问“我叫什么?”,验证会话历史(Agent 能记住本轮对话内容)
- 新建会话,提问“我叫什么?”,验证长期记忆(Agent 能跨会话记住用户信息)
代码集成
环境准备
pip3 install alibabacloud_agentrun20250910 openai tablestore agentrun-sdk agentrun-mem0ai google-adk litellm langchain-openai
配置环境变量(请勿将密钥硬编码在代码中):
# 阿里云账号凭据 export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id" export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret" # Agent调用配置(在控制台Agent运行时 > 详情 > 概览 > 访问信息中获取) export AGENT_ENDPOINT="https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations" # Agent调用凭证(在控制台凭证管理中获取) export AGENT_API_KEY="your-agent-api-key" # 记忆存储名称(步骤一创建后获取) export MEMORY_COLLECTION_NAME="mem-xxxx" # DashScope API Key(LangChain 和 Google ADK 示例使用) export DASHSCOPE_API_KEY="your-dashscope-api-key" # Tablestore 配置(会话状态示例使用) export TABLESTORE_ENDPOINT="https://{INSTANCE}.{REGION}.ots.aliyuncs.com" export TABLESTORE_INSTANCE="your-instance-name"
会话历史
三轮对话测试:第 1 轮告知姓名和职业,第 2 轮直接问"我是谁",验证 Agent 是否记住了上文;第 3 轮继续追问,验证完整上下文是否贯穿始终。每次运行生成新的 session_id,互不干扰。
import os import re import threading import time import uuid from openai import OpenAI # ── 配置 ────────────────────────────────────────────────────────────────────── AGENT_ENDPOINT = os.environ["AGENT_ENDPOINT"] AGENT_API_KEY = os.environ["AGENT_API_KEY"] MODEL = os.getenv("MODEL", "qwen3.5-plus") # 从 Endpoint URL 解析账号 ID 和 Agent 名称。 # Endpoint 格式:https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations # 注意:这里使用账号 ID 仅适合单用户演示场景。在多用户应用中, # user_id 应代表终端用户(如应用内的用户 ID),而非开发者账号。 _account_match = re.match(r"https://(\d+)\.agentrun-data\.", AGENT_ENDPOINT) USER_ID = _account_match.group(1) if _account_match else "default_user" _agent_match = re.search(r"/agent-runtimes/([^/]+)/endpoints/", AGENT_ENDPOINT) AGENT_ID = _agent_match.group(1) if _agent_match else "default_agent" client = OpenAI( base_url=f"{AGENT_ENDPOINT}/openai/v1", api_key=AGENT_API_KEY, ) # ── 对话函数 ────────────────────────────────────────────────────────────────── def _spinner(stop_event: threading.Event): """在等待首个 Token 期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('\r' + ' ' * 35 + '\r', end='', flush=True) def chat(session_id: str, history: list, message: str) -> str: """向 Agent 发送消息,通过 session_id 和消息历史维持会话上下文。 history 为本轮之前的对话历史,格式为 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] 只保留 user/assistant 的文本内容(不含 tool_calls),历史消息与本轮消息 一起发送,Agent 可从上下文中直接读取之前的对话内容。 session_id 通过请求头 X-AgentRun-Conversation-ID 传递,AgentRun 用其 关联 Tablestore 中的会话记录(用 extra_body 传递无效,服务端不读请求体)。 """ print(f"用户: {message}") # 启动转圈动画(覆盖冷启动和 MCP 工具调用的等待时间) stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() stream = client.chat.completions.create( model=MODEL, messages=history + [{"role": "user", "content": message}], stream=True, extra_headers={ "X-AgentRun-Conversation-ID": session_id, "X-AgentRun-User-ID": USER_ID, "X-AgentRun-Agent-ID": AGENT_ID, }, ) full_reply = [ ] first_token = True for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: token = chunk.choices[0].delta.content if first_token: stop_event.set() spinner.join() print("助手: ", end="", flush=True) first_token = False print(token, end="", flush=True) full_reply.append(token) stop_event.set() spinner.join() print("\n") return "".join(full_reply) # ── 主程序 ──────────────────────────────────────────────────────────────────── def main(): # 每次运行生成新的 session_id,避免与上次运行的历史混淆。 # 在实际应用中,session_id 通常由应用层管理(如绑定用户的对话 ID)。 session_id = str(uuid.uuid4()) history = [ ] # 维护对话历史(只含 user/assistant 文本消息) print("═" * 50) print("示例:多轮对话(会话历史)") print("═" * 50) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") # 第一轮:自我介绍 print("\n[第 1 轮]") q1 = "我叫小明,我是一名数据工程师,最近在研究 Tablestore" r1 = chat(session_id, history, q1) history.extend([ {"role": "user", "content": q1}, {"role": "assistant", "content": r1}, ]) # 第二轮:测试 Agent 是否记住了第一轮的内容 print("[第 2 轮]") q2 = "我叫什么名字?我从事什么工作?我最近在研究什么?" r2 = chat(session_id, history, q2) history.extend([ {"role": "user", "content": q2}, {"role": "assistant", "content": r2}, ]) # 第三轮:继续深入话题 print("[第 3 轮]") q3 = "针对我的工作,你有什么 Tablestore 使用建议?" chat(session_id, history, q3) if __name__ == "__main__": main()
说明:推荐使用流式模式(stream=True):Agent 在处理消息时会调用 MCP 记忆工具(读写 Tablestore),流式模式可避免客户端因等待 MCP 工具调用而超时。
将上述代码保存为 conversation_history.py 并运行:
python3 conversation_history.py
长期记忆
方式一:通过 MCP 集成
在记忆详情的长期记忆 > MCP 集成中启动服务配置,启动后获取 MCP 服务 URL(SSE),配置到支持 MCP 工具调用的 Agent 中,即可自动提取、更新、删除长期记忆。
说明:通过控制台使用Agent运行时,可在创建Agent配置记忆时开启MCP(未提示表示已开启),Agent会在每轮对话中自动调用 MCP 工具检索和更新长期记忆。对话中出现可提取的用户信息时(如:“我喜欢喝咖啡”),Agent 会自动存入向量库供后续跨 session 使用。
方式二:直接操作 mem0 向量存储
与方式一不同,方式二需要由代码显式调用 memory.add() 写入记忆—— mem0 不会自动监听对话,也不会自动判断“哪些内容值得记录”。这种方式适合在自定义 Agent 框架或数据管道中,由业务逻辑精确控制记忆的写入时机和内容。
本示例写入一段用户描述,mem0 内部调用 LLM 将其拆解为多条结构化记忆(如“名叫 Alice”、“喜欢喝咖啡”),然后用三个不同角度的语义查询验证检索效果。
import os import time from agentrun.memory_collection import MemoryCollection # ── 配置 ────────────────────────────────────────────────────────────────────── MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"] # agentrun-sdk 自动从以下环境变量读取凭证(无需在代码中显式传入): # ALIBABA_CLOUD_ACCESS_KEY_ID / ALIBABA_CLOUD_ACCESS_KEY_SECRET # AGENTRUN_REGION(默认 cn-hangzhou) # ── 初始化 mem0 客户端 ──────────────────────────────────────────────────────── # to_mem0_memory() 自动完成两件事: # 1. 从 AgentRun 控制平面读取记忆存储配置(Tablestore 实例、集合、向量维度等) # 2. 使用读取到的配置初始化 agentrun-mem0ai Memory 客户端 memory = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME) # ── 核心操作 ────────────────────────────────────────────────────────────────── def add_memories(user_id: str): """向记忆存储中添加用户信息。 memory.add() 支持两种输入: - str:直接写入一段描述文本,mem0 内部会自动提炼为结构化记忆 - list[dict]:完整的对话消息列表(role/content),mem0 从中提取用户偏好 本示例使用文本输入演示基本用法。 """ print(f"\n[添加记忆] user_id={user_id}") result = memory.add( "我叫 Alice,是一名 Python 工程师,平时喜欢喝咖啡,最近在研究向量数据库。", user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"}, ) results = result.get("results", [ ]) print(f" 写入 {len(results)} 条记忆:") for i, res in enumerate(results, 1): print(f" {i}. [{res.get('event')}] {res.get('memory', '')}") def search_memories(user_id: str, query: str): """从记忆存储中检索与查询语义最相关的记忆。""" print(f"\n[检索记忆] 查询: {query!r}") results = memory.search(query, user_id=user_id) hits = results.get("results", [ ]) if not hits: print(" (未找到相关记忆)") return for i, hit in enumerate(hits, 1): score = hit.get("score", 0) content = hit.get("memory", "") print(f" {i}. [{score:.4f}] {content}") # ── 主程序 ──────────────────────────────────────────────────────────────────── def main(): print("═" * 55) print("长期记忆示例 —— 直接操作 mem0 向量存储") print("═" * 55) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") user_id = "alice-demo" # 写入记忆 add_memories(user_id) # 等待 Tablestore 向量索引刷新(首次写入后搜索需要短暂延迟) print("\n等待向量索引刷新...") time.sleep(3) # 语义检索:不同的查询角度 print() print("──" * 27) search_memories(user_id, "她喜欢什么饮料?") search_memories(user_id, "她是做什么工作的?") search_memories(user_id, "她最近在研究什么技术?") if __name__ == "__main__": main()
将上述代码保存为 long_term_memory.py 并运行:
python3 long_term_memory.py
方式三:LangChain 生态集成
以旅行规划助手为例,演示在 LangChain 对话循环中集成 mem0 长期记忆的完整模式:每轮对话开始时先检索与用户问题相关的历史记忆并注入 prompt,生成回复后再将本轮对话内容写回 mem0,使记忆在对话过程中持续积累。多轮后可观察到 Agent 能主动引用早先提到的旅行偏好。
import os import threading import time from typing import List from langchain_core.messages import SystemMessage, HumanMessage, BaseMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai import ChatOpenAI from agentrun.memory_collection import MemoryCollection # ── 配置 ────────────────────────────────────────────────────────────────────── MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"] DASHSCOPE_API_KEY = os.environ["DASHSCOPE_API_KEY"] # ── 初始化 LLM 和 mem0 客户端 ───────────────────────────────────────────────── # 通过 DashScope OpenAI 兼容接口调用大模型 llm = ChatOpenAI( base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key=DASHSCOPE_API_KEY, model="qwen-max", ) # 初始化 mem0 客户端(自动从 AgentRun 控制平面读取 Tablestore 配置) mem0 = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME) # ── Prompt 模板 ─────────────────────────────────────────────────────────────── prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""你是一位专业的旅行规划助手,能够根据用户的偏好和历史对话提供个性化的旅行建议。 如果有相关记忆上下文,请在回答中加以参考,让建议更符合用户需求。"""), MessagesPlaceholder(variable_name="context"), ("human", "{input}"), ]) # ── 核心函数 ────────────────────────────────────────────────────────────────── def _spinner(stop_event: threading.Event): """在等待 LLM 推理期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('\r' + ' ' * 35 + '\r', end='', flush=True) def retrieve_context(query: str, user_id: str) -> List[BaseMessage]: """从 mem0 检索与用户问题相关的记忆,作为 LangChain 消息列表注入 prompt。""" try: memories = mem0.search(query, user_id=user_id) memory_list = memories.get("results", [ ]) if not memory_list: return [ ] serialized = " ".join(m["memory"] for m in memory_list) print(f" [mem0 检索到 {len(memory_list)} 条相关记忆]") return [SystemMessage(content=f"用户相关信息:{serialized}")] except Exception as e: print(f" [mem0 检索失败: {e}]") return [ ] def generate_response(user_input: str, context: List[BaseMessage]) -> str: """携带记忆上下文调用 LLM 生成回复。""" chain = prompt | llm response = chain.invoke({"context": context, "input": user_input}) return response.content def save_interaction(user_id: str, user_input: str, assistant_response: str) -> int: """将本轮对话写回 mem0,供后续对话检索使用。返回新增记忆条数。""" interaction = [ {"role": "user", "content": user_input}, {"role": "assistant", "content": assistant_response}, ] try: result = mem0.add(interaction, user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"}) return len(result.get("results", [ ])) except Exception as e: print(f" [mem0 保存失败: {e}]") return 0 def chat_turn(user_input: str, user_id: str) -> str: """单轮对话:检索记忆 → 生成回复 → 保存记忆。""" context = retrieve_context(user_input, user_id) # 用一个 spinner 同时覆盖 LLM 推理和 mem0 保存,全部完成后再打印回复 stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() response = generate_response(user_input, context) count = save_interaction(user_id, user_input, response) stop_event.set() spinner.join() print(f"助手: {response}\n") if count: print(f" [mem0 新增 {count} 条记忆]\n") return response # ── 主程序 ──────────────────────────────────────────────────────────────────── def main(): print("═" * 60) print("旅行规划助手(LangChain + mem0 长期记忆)") print("═" * 60) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") print("输入 /quit 退出\n") user_id = "travel-user-demo" while True: try: user_input = input("你: ").strip() except (EOFError, KeyboardInterrupt): break if not user_input or user_input == "/quit": break response = chat_turn(user_input, user_id) if __name__ == "__main__": main()
将上述代码保存为 langchain_memory.py 并运行:
python3 langchain_memory.py
会话状态
方式一:通过 Tablestore SDK 查询会话数据
AgentRun 将每次对话的会话元数据写入 session 表,消息内容写入 message 表。本示例直接扫描这两张表,列出最近的会话记录,并读取其中一个会话的完整对话内容。适用于数据分析、会话审计等只读场景。
import json import os from datetime import datetime import tablestore as ots # ── 配置 ────────────────────────────────────────────────────────────────────── ENDPOINT = os.environ["TABLESTORE_ENDPOINT"] INSTANCE = os.environ["TABLESTORE_INSTANCE"] ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"] ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"] client = ots.OTSClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE) # ── 查询函数 ────────────────────────────────────────────────────────────────── def list_sessions(user_id: str = None, limit: int = 10) -> list[dict]: """查询 session 表中的会话元数据列表。""" if user_id: min_pk = [("user_id", user_id), ("session_id", ots.INF_MIN)] max_pk = [("user_id", user_id), ("session_id", ots.INF_MAX)] else: min_pk = [("user_id", ots.INF_MIN), ("session_id", ots.INF_MIN)] max_pk = [("user_id", ots.INF_MAX), ("session_id", ots.INF_MAX)] _, _, rows, _ = client.get_range( "session", ots.Direction.FORWARD, min_pk, max_pk, columns_to_get=[ ], max_version=1, limit=limit, ) sessions = [ ] for row in rows: pks = dict(row.primary_key) attrs = {c[0]: c[1] for c in row.attribute_columns} # update_time 单位为微秒 update_time_us = int(attrs.get("update_time", 0)) sessions.append({ "user_id": pks["user_id"], "session_id": pks["session_id"], "agent_id": attrs.get("agent_id", ""), "update_time": datetime.fromtimestamp(update_time_us / 1e6).strftime("%Y-%m-%d %H:%M:%S"), }) return sessions def get_session_messages(session_id: str) -> list[dict]: """查询 message 表中指定会话的所有对话消息。""" min_pk = [("session_id", session_id), ("create_time", ots.INF_MIN), ("message_id", ots.INF_MIN)] max_pk = [("session_id", session_id), ("create_time", ots.INF_MAX), ("message_id", ots.INF_MAX)] _, _, rows, _ = client.get_range( "message", ots.Direction.FORWARD, min_pk, max_pk, columns_to_get=[ ], max_version=1, ) messages = [ ] for row in rows: attrs = {c[0]: c[1] for c in row.attribute_columns} if "content" in attrs: try: messages.extend(json.loads(attrs["content"])) except json.JSONDecodeError: pass return messages # ── 主程序 ──────────────────────────────────────────────────────────────────── def main(): print("═" * 55) print("AgentRun 会话状态 —— Tablestore 数据查询示例") print("═" * 55) # ── 1. 列出所有会话 ─────────────────────────────────────────────────────── print("\n[会话列表]") sessions = list_sessions(limit=5) if not sessions: print(" (暂无会话记录)") return for s in sessions: print(f" session_id : {s['session_id']}") print(f" agent_id : {s['agent_id']}") print(f" 更新时间 : {s['update_time']}") print() # ── 2. 读取最近一个会话的消息 ───────────────────────────────────────────── latest = sessions[-1] sid = latest["session_id"] print(f"\n[会话详情] session_id: {sid}") messages = get_session_messages(sid) if not messages: print(" (该会话暂无消息)") else: for msg in messages: role = msg.get("role", "?") content = msg.get("content", "")[:100] ellipsis = "..." if len(msg.get("content", "")) > 100 else "" print(f" [{role}] {content}{ellipsis}") if __name__ == "__main__": main()
将上述代码保存为 session_state.py 并运行:
python3 session_state.py
方式二:Google ADK 集成(OTSSessionService)
演示如何将 Tablestore 作为 Google ADK Agent 的会话状态后端。通过 OTSSessionService,ADK Agent 的每次对话都会自动持久化到 Tablestore,进程重启后可继续之前的会话,无需重新建立上下文。示例包含一个能查询天气的工具,用于验证 Agent 的工具调用和会话记录是否正常持久化。
from __future__ import annotations import asyncio import os import sys import threading import time from typing import Any from google.adk.agents import Agent from google.adk.models.lite_llm import LiteLlm from google.adk.runners import Runner from google.genai import types from agentrun.conversation_service import SessionStore from agentrun.conversation_service.adapters import OTSSessionService # ── 配置 ────────────────────────────────────────────────────────────────────── MEMORY_COLLECTION_NAME = os.environ.get("MEMORY_COLLECTION_NAME", "") DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") if not MEMORY_COLLECTION_NAME: print("ERROR: 请设置环境变量 MEMORY_COLLECTION_NAME") sys.exit(1) APP_NAME = "adk-chat-demo" USER_ID = "demo_user" SESSION_FILE = ".adk_session_id" # 本地持久化 session ID,删除此文件可开始新会话 # ── 工具定义 ────────────────────────────────────────────────────────────────── def _spinner(stop_event: threading.Event): """在等待 Agent 推理期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'\rAgent: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('\r' + ' ' * 35 + '\r', end='', flush=True) def get_weather(city: str) -> dict[str, Any]: """查询指定城市的天气信息。""" data = { "北京": {"weather": "晴", "temperature": "5~15°C"}, "上海": {"weather": "多云", "temperature": "12~20°C"}, "杭州": {"weather": "阴", "temperature": "10~18°C"}, } return data.get(city, {"error": "暂无该城市数据"}) # ── Step 1: 初始化 SessionStore ─────────────────────────────────────────────── # SessionStore.from_memory_collection() 从 AgentRun 控制平面读取记忆存储配置, # 自动获取 Tablestore 实例和 endpoint,无需手动配置 OTS 连接参数。 store = SessionStore.from_memory_collection(MEMORY_COLLECTION_NAME) store.init_tables() store.init_search_index() # ── Step 2: 创建 OTSSessionService ─────────────────────────────────────────── session_service = OTSSessionService(session_store=store) # ── Step 3: 创建 Agent + Runner ─────────────────────────────────────────────── # 通过 LiteLLM 调用 DashScope OpenAI 兼容接口 custom_model = LiteLlm( model="openai/qwen-max", api_key=DASHSCOPE_API_KEY, api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", ) agent = Agent( name="smart_assistant", model=custom_model, instruction="你是一个友好的中文智能助手,用户问天气时调用 get_weather 工具。", tools=[get_weather], ) runner = Runner( agent=agent, app_name=APP_NAME, session_service=session_service, ) # ── Step 4: 对话(自动持久化到 Tablestore) ─────────────────────────────────── async def chat(session_id: str, text: str) -> str: """发送消息并返回 Agent 回复。""" content = types.Content( role="user", parts=[types.Part(text=text)], ) reply_parts: list[str] = [ ] async for event in runner.run_async( user_id=USER_ID, session_id=session_id, new_message=content, ): if event.is_final_response() and event.content and event.content.parts: for part in event.content.parts: if part.text: reply_parts.append(part.text) return "\n".join(reply_parts) # ── 主程序 ──────────────────────────────────────────────────────────────────── async def main() -> None: # 从本地文件读取上次的 session ID,尝试恢复会话;找不到则创建新会话 session = None if os.path.exists(SESSION_FILE): saved_id = open(SESSION_FILE).read().strip() session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=saved_id ) if session: print(f"继续之前的会话: {session.id}") if session is None: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, state={"user:language": "zh-CN"}, ) open(SESSION_FILE, "w").write(session.id) print(f"新会话已创建: {session.id}") print("输入 /quit 退出,/new 开始新会话\n") while True: try: user_input = input("你: ").strip() except (EOFError, KeyboardInterrupt): break if not user_input: continue if user_input == "/quit": break if user_input == "/new": session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, state={"user:language": "zh-CN"}, ) open(SESSION_FILE, "w").write(session.id) print(f"新会话已创建: {session.id}\n") continue stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() reply = await chat(session.id, user_input) stop_event.set() spinner.join() print(f"Agent: {reply}\n") if __name__ == "__main__": asyncio.run(main())
将上述代码保存为 adk_session.py 并运行:
python3 adk_session.py
管理记忆存储
查询记忆存储的配置详情,包括关联的 Tablestore 实例名称、向量集合、向量维度、嵌入模型和长期记忆提取模型,以及当前账号下的所有记忆存储列表。适用于排查配置问题或了解记忆存储的底层资源信息。
import os from alibabacloud_agentrun20250910.client import Client from alibabacloud_tea_openapi import models as open_api_models # ── 配置 ────────────────────────────────────────────────────────────────────── ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"] ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"] MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"] REGION = os.getenv("AGENTRUN_REGION", "cn-hangzhou") config = open_api_models.Config( access_key_id=ACCESS_KEY_ID, access_key_secret=ACCESS_KEY_SECRET, endpoint=f"agentrun.{REGION}.aliyuncs.com", ) client = Client(config) # ── 工具函数 ────────────────────────────────────────────────────────────────── def get_memory_collection(name: str) -> dict: """查询记忆存储的详细配置。""" resp = client.get_memory_collection(name) return resp.body.data.to_map() def list_memory_collections() -> list: """列出当前账号下的所有记忆存储。""" from alibabacloud_agentrun20250910 import models as ar_models req = ar_models.ListMemoryCollectionsRequest() resp = client.list_memory_collections(req) return [item.to_map() for item in (resp.body.data.items or [ ])] # ── 主程序 ──────────────────────────────────────────────────────────────────── def main(): # ── 查询单个记忆存储 ────────────────────────────────────────────────────── print("═" * 55) print(f"查询记忆存储:{MEMORY_COLLECTION_NAME}") print("═" * 55) info = get_memory_collection(MEMORY_COLLECTION_NAME) print(f"名称:{info.get('memoryCollectionName')}") print(f"ID:{info.get('memoryCollectionId')}") print(f"会话历史:{'已启用' if info.get('enableConversationHistory') else '未启用'}") print(f"会话状态:{'已启用' if info.get('enableConversationState') else '未启用'}") vector_cfg = info.get("vectorStoreConfig", {}) print(f"\n向量数据库:{vector_cfg.get('provider')}") vector_detail = vector_cfg.get("config", {}) print(f" 实例:{vector_detail.get('instanceName')}") print(f" 集合:{vector_detail.get('collectionName')}") print(f" 向量维度:{vector_detail.get('vectorDimension')}") embedder_cfg = info.get("embedderConfig", {}) embedder_detail = embedder_cfg.get("config", {}) print(f"\n嵌入模型:{embedder_detail.get('model')}") print(f"模型服务:{embedder_cfg.get('modelServiceName')}") llm_cfg = info.get("llmConfig", {}) llm_detail = llm_cfg.get("config", {}) print(f"\n长期记忆提取模型:{llm_detail.get('model')}") print(f"模型服务:{llm_cfg.get('modelServiceName')}") # ── 列出所有记忆存储 ────────────────────────────────────────────────────── print("\n" + "═" * 55) print("当前账号下的所有记忆存储") print("═" * 55) collections = list_memory_collections() if not collections: print("(无)") for c in collections: status = c.get("status", "") print(f" - {c.get('memoryCollectionName')} [{status}]") if __name__ == "__main__": main()
将上述代码保存为 manage_memory.py 并运行:
python3 manage_memory.py