【AI Agent系列】【MetaGPT多智能体学习】7. 剖析BabyAGI:原生多智能体案例一探究竟(附简化版可运行代码)

简介: 【AI Agent系列】【MetaGPT多智能体学习】7. 剖析BabyAGI:原生多智能体案例一探究竟(附简化版可运行代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第五篇笔记。今天我们拆解一个之前提到过的多智能体案例 - BabyAGI,梳理出其实现原理,多智能体间的交互过程(数据流)。这是最原生的多智能体案例,没有用类似AutoGPT或MetaGPT等任何多智能体框架。从这个案例中我们能更好地理解智能体的底层实现原理。

系列笔记

0. BabyAGI 简介

项目地址:https://github.com/yoheinakajima/babyagi/blob/main/README.md

该项目是一个 AI 支持的任务管理系统示例,它根据初始任务或目标,利用OpenAI创建任务列表,并对任务进行优先级排序和执行任务。其背后的主要思想是基于先前任务的结果和预定义的目标创建任务,然后使用 OpenAI 的能力根据目标创建新任务。这是原始的任务驱动的自驱Agent(2023 年 3 月 28 日)的简化版本。

0.1 运行流程

其运行流程如下:

(1)从任务列表中提取第一个任务

(2)将任务发送到执行代理(Execution Agent),该Agent使用LLM根据上下文完成任务。

(3)丰富结果并将其存储在向量数据库中

(4)创建新任务,并根据上一任务的目标和结果重新确定任务列表的优先级。

(5)重复以上步骤

其中涉及四个Agent,前三个Agent都利用了大模型的能力来进行任务规划和总结:

  • Execution Agent 接收目标和任务,调用大模型 LLM来生成任务结果。
  • Task Creation Agent 使用大模型LLM 根据目标和前一个任务的结果创建新任务。它的输入是:目标,前一个任务的结果,任务描述和当前任务列表。
  • Prioritization Agent 使用大模型LLM对任务列表进行重新排序。它接受一个参数:当前任务的 ID
  • Context Agent 使用向量存储和检索任务结果以获取上下文。

官方给出的数据流图如下:

1. BabyAGI 运行

要想更好地理解其原理和工作流程,首先需要将项目跑起来。

1.1 下载开源代码

git clone https://github.com/yoheinakajima/babyagi.git
pip install -r requirements.txt

1.2 填写配置文件

(1)复制一份 .env.example 文件,并重命名为 .env 文件

cp .env.example .env

(2)在 .env 文件中填入自己的 OpenAI key,OpenAI Base Url 等。我的运行不使用 weaviate 和 pinecone,因此不用填相关的config。

1.3 简化代码

因为我的运行不使用 weaviate 和 pinecone,也只是用 OpenAI 模型,所以将不用的代码删掉了,看起来清爽一点。然后,适配了一下 OpenAI API > 1.0 版本的接口。原代码使用的API < 1.0,太旧了。

#!/usr/bin/env python3
from dotenv import load_dotenv
# Load default environment variables (.env)
load_dotenv()
import os
import time
import logging
from collections import deque
from typing import Dict, List
import importlib
# import openai
import chromadb
import tiktoken as tiktoken
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from chromadb.api.types import Documents, EmbeddingFunction, Embeddings
import re
from openai import OpenAI
# default opt out of chromadb telemetry.
from chromadb.config import Settings
client = chromadb.Client(Settings(anonymized_telemetry=False))
# Engine configuration
# Model: GPT, LLAMA, HUMAN, etc.
LLM_MODEL = os.getenv("LLM_MODEL", os.getenv("OPENAI_API_MODEL", "gpt-3.5-turbo")).lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# API Keys
if not (LLM_MODEL.startswith("llama") or LLM_MODEL.startswith("human")):
    assert OPENAI_API_KEY, "\033[91m\033[1m" + "OPENAI_API_KEY environment variable is missing from .env" + "\033[0m\033[0m"
# Table config
RESULTS_STORE_NAME = os.getenv("RESULTS_STORE_NAME", os.getenv("TABLE_NAME", ""))
assert RESULTS_STORE_NAME, "\033[91m\033[1m" + "RESULTS_STORE_NAME environment variable is missing from .env" + "\033[0m\033[0m"
# Run configuration
INSTANCE_NAME = os.getenv("INSTANCE_NAME", os.getenv("BABY_NAME", "BabyAGI"))
COOPERATIVE_MODE = "none"
JOIN_EXISTING_OBJECTIVE = False
# Goal configuration
OBJECTIVE = os.getenv("OBJECTIVE", "")
INITIAL_TASK = os.getenv("INITIAL_TASK", os.getenv("FIRST_TASK", ""))
# Model configuration
OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
# Extensions support begin
def can_import(module_name):
    try:
        importlib.import_module(module_name)
        return True
    except ImportError:
        return False
DOTENV_EXTENSIONS = os.getenv("DOTENV_EXTENSIONS", "").split(" ")
# Command line arguments extension
# Can override any of the above environment variables
ENABLE_COMMAND_LINE_ARGS = (
        os.getenv("ENABLE_COMMAND_LINE_ARGS", "false").lower() == "true"
)
if ENABLE_COMMAND_LINE_ARGS:
    if can_import("extensions.argparseext"):
        from extensions.argparseext import parse_arguments
        OBJECTIVE, INITIAL_TASK, LLM_MODEL, DOTENV_EXTENSIONS, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE = parse_arguments()
# Human mode extension
# Gives human input to babyagi
if LLM_MODEL.startswith("human"):
    if can_import("extensions.human_mode"):
        from extensions.human_mode import user_input_await
# Load additional environment variables for enabled extensions
# TODO: This might override the following command line arguments as well:
#    OBJECTIVE, INITIAL_TASK, LLM_MODEL, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE
if DOTENV_EXTENSIONS:
    if can_import("extensions.dotenvext"):
        from extensions.dotenvext import load_dotenv_extensions
        load_dotenv_extensions(DOTENV_EXTENSIONS)
# TODO: There's still work to be done here to enable people to get
# defaults from dotenv extensions, but also provide command line
# arguments to override them
# Extensions support end
print("\033[95m\033[1m" + "\n*****CONFIGURATION*****\n" + "\033[0m\033[0m")
print(f"Name  : {INSTANCE_NAME}")
print(f"Mode  : {'alone' if COOPERATIVE_MODE in ['n', 'none'] else 'local' if COOPERATIVE_MODE in ['l', 'local'] else 'distributed' if COOPERATIVE_MODE in ['d', 'distributed'] else 'undefined'}")
print(f"LLM   : {LLM_MODEL}")
# Check if we know what we are doing
assert OBJECTIVE, "\033[91m\033[1m" + "OBJECTIVE environment variable is missing from .env" + "\033[0m\033[0m"
assert INITIAL_TASK, "\033[91m\033[1m" + "INITIAL_TASK environment variable is missing from .env" + "\033[0m\033[0m"
print("\033[94m\033[1m" + "\n*****OBJECTIVE*****\n" + "\033[0m\033[0m")
print(f"{OBJECTIVE}")
if not JOIN_EXISTING_OBJECTIVE:
    print("\033[93m\033[1m" + "\nInitial task:" + "\033[0m\033[0m" + f" {INITIAL_TASK}")
else:
    print("\033[93m\033[1m" + f"\nJoining to help the objective" + "\033[0m\033[0m")
# Results storage using local ChromaDB
class DefaultResultsStorage:
    def __init__(self):
        logging.getLogger('chromadb').setLevel(logging.ERROR)
        # Create Chroma collection
        chroma_persist_dir = "chroma"
        chroma_client = chromadb.PersistentClient(
            settings=chromadb.config.Settings(
                persist_directory=chroma_persist_dir,
            )
        )
        metric = "cosine"
        embedding_function = OpenAIEmbeddingFunction(api_key=OPENAI_API_KEY)
        self.collection = chroma_client.get_or_create_collection(
            name=RESULTS_STORE_NAME,
            metadata={"hnsw:space": metric},
            embedding_function=embedding_function,
        )
    def add(self, task: Dict, result: str, result_id: str):
        # Break the function if LLM_MODEL starts with "human" (case-insensitive)
        if LLM_MODEL.startswith("human"):
            return
        # Continue with the rest of the function
        embeddings = llm_embed.embed(result) if LLM_MODEL.startswith("llama") else None
        if (
                len(self.collection.get(ids=[result_id], include=[])["ids"]) > 0
        ):  # Check if the result already exists
            self.collection.update(
                ids=result_id,
                embeddings=embeddings,
                documents=result,
                metadatas={"task": task["task_name"], "result": result},
            )
        else:
            self.collection.add(
                ids=result_id,
                embeddings=embeddings,
                documents=result,
                metadatas={"task": task["task_name"], "result": result},
            )
    def query(self, query: str, top_results_num: int) -> List[dict]:
        count: int = self.collection.count()
        if count == 0:
            return []
        results = self.collection.query(
            query_texts=query,
            n_results=min(top_results_num, count),
            include=["metadatas"]
        )
        return [item["task"] for item in results["metadatas"][0]]
# Initialize results storage
def use_chroma():
    print("\nUsing results storage: " + "\033[93m\033[1m" + "Chroma (Default)" + "\033[0m\033[0m")
    return DefaultResultsStorage()
results_storage = use_chroma()
# Task storage supporting only a single instance of BabyAGI
class SingleTaskListStorage:
    def __init__(self):
        self.tasks = deque([])
        self.task_id_counter = 0
    def append(self, task: Dict):
        self.tasks.append(task)
    def replace(self, tasks: List[Dict]):
        self.tasks = deque(tasks)
    def popleft(self):
        return self.tasks.popleft()
    def is_empty(self):
        return False if self.tasks else True
    def next_task_id(self):
        self.task_id_counter += 1
        return self.task_id_counter
    def get_task_names(self):
        return [t["task_name"] for t in self.tasks]
# Initialize tasks storage
tasks_storage = SingleTaskListStorage()
if COOPERATIVE_MODE in ['l', 'local']:
    if can_import("extensions.ray_tasks"):
        import sys
        from pathlib import Path
        sys.path.append(str(Path(__file__).resolve().parent))
        from extensions.ray_tasks import CooperativeTaskListStorage
        tasks_storage = CooperativeTaskListStorage(OBJECTIVE)
        print("\nReplacing tasks storage: " + "\033[93m\033[1m" + "Ray" + "\033[0m\033[0m")
elif COOPERATIVE_MODE in ['d', 'distributed']:
    pass
def limit_tokens_from_string(string: str, model: str, limit: int) -> str:
    """Limits the string to a number of tokens (estimated)."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except:
        encoding = tiktoken.encoding_for_model('gpt2')  # Fallback for others.
    encoded = encoding.encode(string)
    return encoding.decode(encoded[:limit])
client = OpenAI()
def openai_call(
    prompt: str,
    model: str = LLM_MODEL,
    temperature: float = OPENAI_TEMPERATURE,
    max_tokens: int = 100,
):
    # Use 4000 instead of the real limit (4097) to give a bit of wiggle room for the encoding of roles.
    trimmed_prompt = limit_tokens_from_string(prompt, model, 4000 - max_tokens)
    # Use chat completion API
    messages = [{"role": "system", "content": trimmed_prompt}]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=temperature,
        max_tokens=max_tokens,
        n=1,
        stop=None,
    )
    return response.choices[0].message.content.strip()
       
def task_creation_agent(
        objective: str, result: Dict, task_description: str, task_list: List[str]
):
    prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""
    if task_list:
        prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"
    prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "
    if task_list:
        prompt += "These new tasks must not overlap with incomplete tasks. "
    prompt += """
Return one task per line in your response. The result must be a numbered list in the format:
#. First task
#. Second task
The number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output. OUTPUT IN CHINESE"""
    print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')
    response = openai_call(prompt, max_tokens=2000)
    print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')
    new_tasks = response.split('\n')
    new_tasks_list = []
    for task_string in new_tasks:
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
            task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
            if task_name.strip() and task_id.isnumeric():
                new_tasks_list.append(task_name)
            # print('New task created: ' + task_name)
    out = [{"task_name": task_name} for task_name in new_tasks_list]
    return out
def prioritization_agent():
    task_names = tasks_storage.get_task_names()
    bullet_string = '\n'
    prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:
#. First task
#. Second task
The entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output. OUTPUT IN CHINESE"""
    print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')
    response = openai_call(prompt, max_tokens=2000)
    print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')
    if not response:
        print('Received empty response from priotritization agent. Keeping task list unchanged.')
        return
    new_tasks = response.split("\n") if "\n" in response else [response]
    new_tasks_list = []
    for task_string in new_tasks:
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
            task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
            if task_name.strip():
                new_tasks_list.append({"task_id": task_id, "task_name": task_name})
    return new_tasks_list
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:
    """
    Executes a task based on the given objective and previous context.
    Args:
        objective (str): The objective or goal for the AI to perform the task.
        task (str): The task to be executed by the AI.
    Returns:
        str: The response generated by the AI for the given task.
    """
    context = context_agent(query=objective, top_results_num=5)
    # print("\n****RELEVANT CONTEXT****\n")
    # print(context)
    # print('')
    prompt = f'OUTPUT IN CHINESE. Perform one task based on the following objective: {objective}.\n'
    if context:
        prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)
    prompt += f'\nYour task: {task}\nResponse:'
    return openai_call(prompt, max_tokens=2000)
# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):
    """
    Retrieves context for a given query from an index of tasks.
    Args:
        query (str): The query or objective for retrieving context.
        top_results_num (int): The number of top results to retrieve.
    Returns:
        list: A list of tasks as context for the given query, sorted by relevance.
    """
    results = results_storage.query(query=query, top_results_num=top_results_num)
    print("****RESULTS****")
    print(results)
    return results
# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:
    initial_task = {
        "task_id": tasks_storage.next_task_id(),
        "task_name": INITIAL_TASK
    }
    tasks_storage.append(initial_task)
def main():
    loop = True
    while loop:
        # As long as there are tasks in the storage...
        if not tasks_storage.is_empty():
            # Print the task list
            print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
            for t in tasks_storage.get_task_names():
                print(" • " + str(t))
            # Step 1: Pull the first incomplete task
            task = tasks_storage.popleft()
            print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
            print(str(task["task_name"]))
            # Send to execution function to complete the task based on the context
            result = execution_agent(OBJECTIVE, str(task["task_name"]))
            print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
            print(result)
            # Step 2: Enrich result and store in the results storage
            # This is where you should enrich the result if needed
            enriched_result = {
                "data": result
            }
            # extract the actual result from the dictionary
            # since we don't do enrichment currently
            # vector = enriched_result["data"]
            result_id = f"result_{task['task_id']}"
            results_storage.add(task, result, result_id)
            # Step 3: Create new tasks and re-prioritize task list
            # only the main instance in cooperative mode does that
            new_tasks = task_creation_agent(
                OBJECTIVE,
                enriched_result,
                task["task_name"],
                tasks_storage.get_task_names(),
            )
            print('Adding new tasks to task_storage')
            for new_task in new_tasks:
                new_task.update({"task_id": tasks_storage.next_task_id()})
                print(str(new_task))
                tasks_storage.append(new_task)
            if not JOIN_EXISTING_OBJECTIVE:
                prioritized_tasks = prioritization_agent()
                if prioritized_tasks:
                    tasks_storage.replace(prioritized_tasks)
            # Sleep a bit before checking the task list again
            time.sleep(5)
        else:
            print('Done.')
            loop = False
if __name__ == "__main__":
    main()

1.4 运行

这时候点运行,应该就能运行成功了。但特别注意,不建议直接运行。由于大模型规划任务的能力具有很大的不确定性,很可能导致你的运行产生大量的任务,甚至任务会越来越多,不会停止。这样你的Key,或者你的Token消耗,就很大很大,甚至被封号或者直接破产了 !!!。我是debug运行方式,在每个函数里面都打了断点,这样可以随时停止。

2. 运行过程及结果分析

  • 给定Objective:周杰伦的生日是星期几。
  • 初始Task为:Develop a task list

2.1 运行输出 - 详细解释

(1)根据给定的 Objective 和 初始Task。刚开始 Task List 中只有一个初始Task。所以执行时 Next Task 就是这个初始Task。execution_agent 执行这个任务,运行结果为根据Objective产出一系列实现它的步骤任务,这里产生了4个。说实话,有点多了。

(2)根据 execution_agent 产生的结果和 Objective最终目标,task_creation_agent 创建了新的 Task 列表,添加到任务队列中。

(3)根据 task_creation_agent 产生的任务列表,prioritization_agent 根据任务列表和最终目标进行优先级排序。这里的排序就有点不靠谱了,大模型的能力还是不稳定

(4)下一次循环开始,从任务列表中取出第一个未完成任务,execution_agent 执行任务。上面错误的任务排序,导致了这里的大模型给出幻觉的答案…

(5)又是根据 execution_agent 产生的结果和 Objective最终目标,task_creation_agent 创建了新的 Task 列表,添加到任务队列中。

(6)根据 task_creation_agent 新产生的任务列表,prioritization_agent 根据新的任务列表和最终目标再次进行优先级排序。

(7)又是一轮循环,从任务列表中取出第一个未完成任务,execution_agent 执行任务。

(8)… 一直循环,直到任务队列没有未完成的任务。

看出来没?以这个进度,任务队列什么时候才能空?太浪费 Token 了。

2.2 问题及思考

从上面的运行过程也看出来了,这个多智能体案例太依赖大模型的能力了,就像我之前写的AutoGPT(【AI大模型应用开发】【AutoGPT系列】2. 手撕AutoGPT - 手把手教你用LangChain从0开始写一个简易版AutoGPT(0))一样。

目前看到的问题及一些思考:

(1)第一步产生的子任务太多 - 初始任务可以多写点 Prompt,限制下任务数量或质量

(2)优先级排序,有点不稳定,强依赖大模型的能力。或许需要引入人为排序?

(3)即使答案能够回答用户设置的Objective问题,但是只要任务里还有未完成的任务,它就会继续运行。- 需要添加结束的限制条件或判断。

例如下面这个例子,我给的 Objective 是 “历史上的今天发生了什么?” 。它直接给排了一大堆没必要的任务。

然后其实下面执行第一个任务后就得到了我想要的结果。

但它还在继续创建新的任务,我也不知道它什么时候会停止。

(4)最严重的问题,前面提到的,没有循环次数的限制,代码这样写,基本快等于 while True了,无限循环了。太危险了。

(5)还有一个没理解的点,为啥在 execution_agent 中要获取前面的任务?这里应该换成前面任务的执行结果才更合适吧?

3. 总结

本文我们学习了一个原生的多智能体案例 - BabyAGI,从环境搭建到运行,对每一步的输出都做了详细的说明,最后对运行过程中发现的一些问题也有一些自己的优化思考。

BabyAGI 其实就是利用大模型进行任务规划,任务排序,任务执行。这三个过程不断循环,再加上一点上下文信息以得到更高质量的结果,直到满足最终的目标。

三个主要过程,全部依赖大模型的能力,有点不可控,目前来说,个人认为落地比较难。咱们主要从中学一下它的实现思想吧。


站内文章一览

相关文章
|
7月前
|
人工智能 测试技术 API
构建AI智能体:二、DeepSeek的Ollama部署FastAPI封装调用
本文介绍如何通过Ollama本地部署DeepSeek大模型,结合FastAPI实现API接口调用。涵盖Ollama安装、路径迁移、模型下载运行及REST API封装全过程,助力快速构建可扩展的AI应用服务。
2416 7
|
7月前
|
人工智能 运维 安全
加速智能体开发:从 Serverless 运行时到 Serverless AI 运行时
在云计算与人工智能深度融合的背景下,Serverless 技术作为云原生架构的集大成者,正加速向 AI 原生架构演进。阿里云函数计算(FC)率先提出并实践“Serverless AI 运行时”概念,通过技术创新与生态联动,为智能体(Agent)开发提供高效、安全、低成本的基础设施支持。本文从技术演进路径、核心能力及未来展望三方面解析 Serverless AI 的突破性价值。
|
7月前
|
人工智能 API 开发工具
构建AI智能体:一、初识AI大模型与API调用
本文介绍大模型基础知识及API调用方法,涵盖阿里云百炼平台密钥申请、DashScope SDK使用、Python调用示例(如文本情感分析、图像文字识别),助力开发者快速上手大模型应用开发。
2751 18
构建AI智能体:一、初识AI大模型与API调用
|
7月前
|
人工智能 缓存 运维
【智造】AI应用实战:6个agent搞定复杂指令和工具膨胀
本文介绍联调造数场景下的AI应用演进:从单Agent模式到多Agent协同的架构升级。针对复杂指令执行不准、响应慢等问题,通过意图识别、工具引擎、推理执行等多Agent分工协作,结合工程化手段提升准确性与效率,并分享了关键设计思路与实践心得。
1168 20
【智造】AI应用实战:6个agent搞定复杂指令和工具膨胀
|
7月前
|
存储 机器学习/深度学习 人工智能
构建AI智能体:三、Prompt提示词工程:几句话让AI秒懂你心
本文深入浅出地讲解Prompt原理及其与大模型的关系,系统介绍Prompt的核心要素、编写原则与应用场景,帮助用户通过精准指令提升AI交互效率,释放大模型潜能。
1327 6
|
7月前
|
机器学习/深度学习 人工智能 JSON
PHP从0到1实现 AI 智能体系统并且训练知识库资料
本文详解如何用PHP从0到1构建AI智能体,涵盖提示词设计、记忆管理、知识库集成与反馈优化四大核心训练维度,结合实战案例与系统架构,助你打造懂业务、会进化的专属AI助手。
1126 6
|
7月前
|
消息中间件 人工智能 安全
云原生进化论:加速构建 AI 应用
本文将和大家分享过去一年在支持企业构建 AI 应用过程的一些实践和思考。
1925 72
|
8月前
|
人工智能 安全 中间件
阿里云 AI 中间件重磅发布,打通 AI 应用落地“最后一公里”
9 月 26 日,2025 云栖大会 AI 中间件:AI 时代的中间件技术演进与创新实践论坛上,阿里云智能集团资深技术专家林清山发表主题演讲《未来已来:下一代 AI 中间件重磅发布,解锁 AI 应用架构新范式》,重磅发布阿里云 AI 中间件,提供面向分布式多 Agent 架构的基座,包括:AgentScope-Java(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。
1550 88
|
7月前
|
人工智能 运维 Kubernetes
Serverless 应用引擎 SAE:为传统应用托底,为 AI 创新加速
在容器技术持续演进与 AI 全面爆发的当下,企业既要稳健托管传统业务,又要高效落地 AI 创新,如何在复杂的基础设施与频繁的版本变化中保持敏捷、稳定与低成本,成了所有技术团队的共同挑战。阿里云 Serverless 应用引擎(SAE)正是为应对这一时代挑战而生的破局者,SAE 以“免运维、强稳定、极致降本”为核心,通过一站式的应用级托管能力,同时支撑传统应用与 AI 应用,让企业把更多精力投入到业务创新。
769 30

热门文章

最新文章