【AI Agent系列】【MetaGPT多智能体学习】7. 剖析BabyAGI:原生多智能体案例一探究竟(附简化版可运行代码)

简介: 【AI Agent系列】【MetaGPT多智能体学习】7. 剖析BabyAGI:原生多智能体案例一探究竟(附简化版可运行代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第五篇笔记。今天我们拆解一个之前提到过的多智能体案例 - BabyAGI,梳理出其实现原理,多智能体间的交互过程(数据流)。这是最原生的多智能体案例,没有用类似AutoGPT或MetaGPT等任何多智能体框架。从这个案例中我们能更好地理解智能体的底层实现原理。

系列笔记

0. BabyAGI 简介

项目地址:https://github.com/yoheinakajima/babyagi/blob/main/README.md

该项目是一个 AI 支持的任务管理系统示例,它根据初始任务或目标,利用OpenAI创建任务列表,并对任务进行优先级排序和执行任务。其背后的主要思想是基于先前任务的结果和预定义的目标创建任务,然后使用 OpenAI 的能力根据目标创建新任务。这是原始的任务驱动的自驱Agent(2023 年 3 月 28 日)的简化版本。

0.1 运行流程

其运行流程如下:

(1)从任务列表中提取第一个任务

(2)将任务发送到执行代理(Execution Agent),该Agent使用LLM根据上下文完成任务。

(3)丰富结果并将其存储在向量数据库中

(4)创建新任务,并根据上一任务的目标和结果重新确定任务列表的优先级。

(5)重复以上步骤

其中涉及四个Agent,前三个Agent都利用了大模型的能力来进行任务规划和总结:

  • Execution Agent 接收目标和任务,调用大模型 LLM来生成任务结果。
  • Task Creation Agent 使用大模型LLM 根据目标和前一个任务的结果创建新任务。它的输入是:目标,前一个任务的结果,任务描述和当前任务列表。
  • Prioritization Agent 使用大模型LLM对任务列表进行重新排序。它接受一个参数:当前任务的 ID
  • Context Agent 使用向量存储和检索任务结果以获取上下文。

官方给出的数据流图如下:

1. BabyAGI 运行

要想更好地理解其原理和工作流程,首先需要将项目跑起来。

1.1 下载开源代码

git clone https://github.com/yoheinakajima/babyagi.git
pip install -r requirements.txt

1.2 填写配置文件

(1)复制一份 .env.example 文件,并重命名为 .env 文件

cp .env.example .env

(2)在 .env 文件中填入自己的 OpenAI key,OpenAI Base Url 等。我的运行不使用 weaviate 和 pinecone,因此不用填相关的config。

1.3 简化代码

因为我的运行不使用 weaviate 和 pinecone,也只是用 OpenAI 模型,所以将不用的代码删掉了,看起来清爽一点。然后,适配了一下 OpenAI API > 1.0 版本的接口。原代码使用的API < 1.0,太旧了。

#!/usr/bin/env python3
from dotenv import load_dotenv
# Load default environment variables (.env)
load_dotenv()
import os
import time
import logging
from collections import deque
from typing import Dict, List
import importlib
# import openai
import chromadb
import tiktoken as tiktoken
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from chromadb.api.types import Documents, EmbeddingFunction, Embeddings
import re
from openai import OpenAI
# default opt out of chromadb telemetry.
from chromadb.config import Settings
client = chromadb.Client(Settings(anonymized_telemetry=False))
# Engine configuration
# Model: GPT, LLAMA, HUMAN, etc.
LLM_MODEL = os.getenv("LLM_MODEL", os.getenv("OPENAI_API_MODEL", "gpt-3.5-turbo")).lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# API Keys
if not (LLM_MODEL.startswith("llama") or LLM_MODEL.startswith("human")):
    assert OPENAI_API_KEY, "\033[91m\033[1m" + "OPENAI_API_KEY environment variable is missing from .env" + "\033[0m\033[0m"
# Table config
RESULTS_STORE_NAME = os.getenv("RESULTS_STORE_NAME", os.getenv("TABLE_NAME", ""))
assert RESULTS_STORE_NAME, "\033[91m\033[1m" + "RESULTS_STORE_NAME environment variable is missing from .env" + "\033[0m\033[0m"
# Run configuration
INSTANCE_NAME = os.getenv("INSTANCE_NAME", os.getenv("BABY_NAME", "BabyAGI"))
COOPERATIVE_MODE = "none"
JOIN_EXISTING_OBJECTIVE = False
# Goal configuration
OBJECTIVE = os.getenv("OBJECTIVE", "")
INITIAL_TASK = os.getenv("INITIAL_TASK", os.getenv("FIRST_TASK", ""))
# Model configuration
OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
# Extensions support begin
def can_import(module_name):
    try:
        importlib.import_module(module_name)
        return True
    except ImportError:
        return False
DOTENV_EXTENSIONS = os.getenv("DOTENV_EXTENSIONS", "").split(" ")
# Command line arguments extension
# Can override any of the above environment variables
ENABLE_COMMAND_LINE_ARGS = (
        os.getenv("ENABLE_COMMAND_LINE_ARGS", "false").lower() == "true"
)
if ENABLE_COMMAND_LINE_ARGS:
    if can_import("extensions.argparseext"):
        from extensions.argparseext import parse_arguments
        OBJECTIVE, INITIAL_TASK, LLM_MODEL, DOTENV_EXTENSIONS, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE = parse_arguments()
# Human mode extension
# Gives human input to babyagi
if LLM_MODEL.startswith("human"):
    if can_import("extensions.human_mode"):
        from extensions.human_mode import user_input_await
# Load additional environment variables for enabled extensions
# TODO: This might override the following command line arguments as well:
#    OBJECTIVE, INITIAL_TASK, LLM_MODEL, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE
if DOTENV_EXTENSIONS:
    if can_import("extensions.dotenvext"):
        from extensions.dotenvext import load_dotenv_extensions
        load_dotenv_extensions(DOTENV_EXTENSIONS)
# TODO: There's still work to be done here to enable people to get
# defaults from dotenv extensions, but also provide command line
# arguments to override them
# Extensions support end
print("\033[95m\033[1m" + "\n*****CONFIGURATION*****\n" + "\033[0m\033[0m")
print(f"Name  : {INSTANCE_NAME}")
print(f"Mode  : {'alone' if COOPERATIVE_MODE in ['n', 'none'] else 'local' if COOPERATIVE_MODE in ['l', 'local'] else 'distributed' if COOPERATIVE_MODE in ['d', 'distributed'] else 'undefined'}")
print(f"LLM   : {LLM_MODEL}")
# Check if we know what we are doing
assert OBJECTIVE, "\033[91m\033[1m" + "OBJECTIVE environment variable is missing from .env" + "\033[0m\033[0m"
assert INITIAL_TASK, "\033[91m\033[1m" + "INITIAL_TASK environment variable is missing from .env" + "\033[0m\033[0m"
print("\033[94m\033[1m" + "\n*****OBJECTIVE*****\n" + "\033[0m\033[0m")
print(f"{OBJECTIVE}")
if not JOIN_EXISTING_OBJECTIVE:
    print("\033[93m\033[1m" + "\nInitial task:" + "\033[0m\033[0m" + f" {INITIAL_TASK}")
else:
    print("\033[93m\033[1m" + f"\nJoining to help the objective" + "\033[0m\033[0m")
# Results storage using local ChromaDB
class DefaultResultsStorage:
    def __init__(self):
        logging.getLogger('chromadb').setLevel(logging.ERROR)
        # Create Chroma collection
        chroma_persist_dir = "chroma"
        chroma_client = chromadb.PersistentClient(
            settings=chromadb.config.Settings(
                persist_directory=chroma_persist_dir,
            )
        )
        metric = "cosine"
        embedding_function = OpenAIEmbeddingFunction(api_key=OPENAI_API_KEY)
        self.collection = chroma_client.get_or_create_collection(
            name=RESULTS_STORE_NAME,
            metadata={"hnsw:space": metric},
            embedding_function=embedding_function,
        )
    def add(self, task: Dict, result: str, result_id: str):
        # Break the function if LLM_MODEL starts with "human" (case-insensitive)
        if LLM_MODEL.startswith("human"):
            return
        # Continue with the rest of the function
        embeddings = llm_embed.embed(result) if LLM_MODEL.startswith("llama") else None
        if (
                len(self.collection.get(ids=[result_id], include=[])["ids"]) > 0
        ):  # Check if the result already exists
            self.collection.update(
                ids=result_id,
                embeddings=embeddings,
                documents=result,
                metadatas={"task": task["task_name"], "result": result},
            )
        else:
            self.collection.add(
                ids=result_id,
                embeddings=embeddings,
                documents=result,
                metadatas={"task": task["task_name"], "result": result},
            )
    def query(self, query: str, top_results_num: int) -> List[dict]:
        count: int = self.collection.count()
        if count == 0:
            return []
        results = self.collection.query(
            query_texts=query,
            n_results=min(top_results_num, count),
            include=["metadatas"]
        )
        return [item["task"] for item in results["metadatas"][0]]
# Initialize results storage
def use_chroma():
    print("\nUsing results storage: " + "\033[93m\033[1m" + "Chroma (Default)" + "\033[0m\033[0m")
    return DefaultResultsStorage()
results_storage = use_chroma()
# Task storage supporting only a single instance of BabyAGI
class SingleTaskListStorage:
    def __init__(self):
        self.tasks = deque([])
        self.task_id_counter = 0
    def append(self, task: Dict):
        self.tasks.append(task)
    def replace(self, tasks: List[Dict]):
        self.tasks = deque(tasks)
    def popleft(self):
        return self.tasks.popleft()
    def is_empty(self):
        return False if self.tasks else True
    def next_task_id(self):
        self.task_id_counter += 1
        return self.task_id_counter
    def get_task_names(self):
        return [t["task_name"] for t in self.tasks]
# Initialize tasks storage
tasks_storage = SingleTaskListStorage()
if COOPERATIVE_MODE in ['l', 'local']:
    if can_import("extensions.ray_tasks"):
        import sys
        from pathlib import Path
        sys.path.append(str(Path(__file__).resolve().parent))
        from extensions.ray_tasks import CooperativeTaskListStorage
        tasks_storage = CooperativeTaskListStorage(OBJECTIVE)
        print("\nReplacing tasks storage: " + "\033[93m\033[1m" + "Ray" + "\033[0m\033[0m")
elif COOPERATIVE_MODE in ['d', 'distributed']:
    pass
def limit_tokens_from_string(string: str, model: str, limit: int) -> str:
    """Limits the string to a number of tokens (estimated)."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except:
        encoding = tiktoken.encoding_for_model('gpt2')  # Fallback for others.
    encoded = encoding.encode(string)
    return encoding.decode(encoded[:limit])
client = OpenAI()
def openai_call(
    prompt: str,
    model: str = LLM_MODEL,
    temperature: float = OPENAI_TEMPERATURE,
    max_tokens: int = 100,
):
    # Use 4000 instead of the real limit (4097) to give a bit of wiggle room for the encoding of roles.
    trimmed_prompt = limit_tokens_from_string(prompt, model, 4000 - max_tokens)
    # Use chat completion API
    messages = [{"role": "system", "content": trimmed_prompt}]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=temperature,
        max_tokens=max_tokens,
        n=1,
        stop=None,
    )
    return response.choices[0].message.content.strip()
       
def task_creation_agent(
        objective: str, result: Dict, task_description: str, task_list: List[str]
):
    prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""
    if task_list:
        prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"
    prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "
    if task_list:
        prompt += "These new tasks must not overlap with incomplete tasks. "
    prompt += """
Return one task per line in your response. The result must be a numbered list in the format:
#. First task
#. Second task
The number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output. OUTPUT IN CHINESE"""
    print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')
    response = openai_call(prompt, max_tokens=2000)
    print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')
    new_tasks = response.split('\n')
    new_tasks_list = []
    for task_string in new_tasks:
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
            task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
            if task_name.strip() and task_id.isnumeric():
                new_tasks_list.append(task_name)
            # print('New task created: ' + task_name)
    out = [{"task_name": task_name} for task_name in new_tasks_list]
    return out
def prioritization_agent():
    task_names = tasks_storage.get_task_names()
    bullet_string = '\n'
    prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:
#. First task
#. Second task
The entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output. OUTPUT IN CHINESE"""
    print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')
    response = openai_call(prompt, max_tokens=2000)
    print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')
    if not response:
        print('Received empty response from priotritization agent. Keeping task list unchanged.')
        return
    new_tasks = response.split("\n") if "\n" in response else [response]
    new_tasks_list = []
    for task_string in new_tasks:
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
            task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
            if task_name.strip():
                new_tasks_list.append({"task_id": task_id, "task_name": task_name})
    return new_tasks_list
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:
    """
    Executes a task based on the given objective and previous context.
    Args:
        objective (str): The objective or goal for the AI to perform the task.
        task (str): The task to be executed by the AI.
    Returns:
        str: The response generated by the AI for the given task.
    """
    context = context_agent(query=objective, top_results_num=5)
    # print("\n****RELEVANT CONTEXT****\n")
    # print(context)
    # print('')
    prompt = f'OUTPUT IN CHINESE. Perform one task based on the following objective: {objective}.\n'
    if context:
        prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)
    prompt += f'\nYour task: {task}\nResponse:'
    return openai_call(prompt, max_tokens=2000)
# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):
    """
    Retrieves context for a given query from an index of tasks.
    Args:
        query (str): The query or objective for retrieving context.
        top_results_num (int): The number of top results to retrieve.
    Returns:
        list: A list of tasks as context for the given query, sorted by relevance.
    """
    results = results_storage.query(query=query, top_results_num=top_results_num)
    print("****RESULTS****")
    print(results)
    return results
# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:
    initial_task = {
        "task_id": tasks_storage.next_task_id(),
        "task_name": INITIAL_TASK
    }
    tasks_storage.append(initial_task)
def main():
    loop = True
    while loop:
        # As long as there are tasks in the storage...
        if not tasks_storage.is_empty():
            # Print the task list
            print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
            for t in tasks_storage.get_task_names():
                print(" • " + str(t))
            # Step 1: Pull the first incomplete task
            task = tasks_storage.popleft()
            print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
            print(str(task["task_name"]))
            # Send to execution function to complete the task based on the context
            result = execution_agent(OBJECTIVE, str(task["task_name"]))
            print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
            print(result)
            # Step 2: Enrich result and store in the results storage
            # This is where you should enrich the result if needed
            enriched_result = {
                "data": result
            }
            # extract the actual result from the dictionary
            # since we don't do enrichment currently
            # vector = enriched_result["data"]
            result_id = f"result_{task['task_id']}"
            results_storage.add(task, result, result_id)
            # Step 3: Create new tasks and re-prioritize task list
            # only the main instance in cooperative mode does that
            new_tasks = task_creation_agent(
                OBJECTIVE,
                enriched_result,
                task["task_name"],
                tasks_storage.get_task_names(),
            )
            print('Adding new tasks to task_storage')
            for new_task in new_tasks:
                new_task.update({"task_id": tasks_storage.next_task_id()})
                print(str(new_task))
                tasks_storage.append(new_task)
            if not JOIN_EXISTING_OBJECTIVE:
                prioritized_tasks = prioritization_agent()
                if prioritized_tasks:
                    tasks_storage.replace(prioritized_tasks)
            # Sleep a bit before checking the task list again
            time.sleep(5)
        else:
            print('Done.')
            loop = False
if __name__ == "__main__":
    main()

1.4 运行

这时候点运行,应该就能运行成功了。但特别注意,不建议直接运行。由于大模型规划任务的能力具有很大的不确定性,很可能导致你的运行产生大量的任务,甚至任务会越来越多,不会停止。这样你的Key,或者你的Token消耗,就很大很大,甚至被封号或者直接破产了 !!!。我是debug运行方式,在每个函数里面都打了断点,这样可以随时停止。

2. 运行过程及结果分析

  • 给定Objective:周杰伦的生日是星期几。
  • 初始Task为:Develop a task list

2.1 运行输出 - 详细解释

(1)根据给定的 Objective 和 初始Task。刚开始 Task List 中只有一个初始Task。所以执行时 Next Task 就是这个初始Task。execution_agent 执行这个任务,运行结果为根据Objective产出一系列实现它的步骤任务,这里产生了4个。说实话,有点多了。

(2)根据 execution_agent 产生的结果和 Objective最终目标,task_creation_agent 创建了新的 Task 列表,添加到任务队列中。

(3)根据 task_creation_agent 产生的任务列表,prioritization_agent 根据任务列表和最终目标进行优先级排序。这里的排序就有点不靠谱了,大模型的能力还是不稳定

(4)下一次循环开始,从任务列表中取出第一个未完成任务,execution_agent 执行任务。上面错误的任务排序,导致了这里的大模型给出幻觉的答案…

(5)又是根据 execution_agent 产生的结果和 Objective最终目标,task_creation_agent 创建了新的 Task 列表,添加到任务队列中。

(6)根据 task_creation_agent 新产生的任务列表,prioritization_agent 根据新的任务列表和最终目标再次进行优先级排序。

(7)又是一轮循环,从任务列表中取出第一个未完成任务,execution_agent 执行任务。

(8)… 一直循环,直到任务队列没有未完成的任务。

看出来没?以这个进度,任务队列什么时候才能空?太浪费 Token 了。

2.2 问题及思考

从上面的运行过程也看出来了,这个多智能体案例太依赖大模型的能力了,就像我之前写的AutoGPT(【AI大模型应用开发】【AutoGPT系列】2. 手撕AutoGPT - 手把手教你用LangChain从0开始写一个简易版AutoGPT(0))一样。

目前看到的问题及一些思考:

(1)第一步产生的子任务太多 - 初始任务可以多写点 Prompt,限制下任务数量或质量

(2)优先级排序,有点不稳定,强依赖大模型的能力。或许需要引入人为排序?

(3)即使答案能够回答用户设置的Objective问题,但是只要任务里还有未完成的任务,它就会继续运行。- 需要添加结束的限制条件或判断。

例如下面这个例子,我给的 Objective 是 “历史上的今天发生了什么?” 。它直接给排了一大堆没必要的任务。

然后其实下面执行第一个任务后就得到了我想要的结果。

但它还在继续创建新的任务,我也不知道它什么时候会停止。

(4)最严重的问题,前面提到的,没有循环次数的限制,代码这样写,基本快等于 while True了,无限循环了。太危险了。

(5)还有一个没理解的点,为啥在 execution_agent 中要获取前面的任务?这里应该换成前面任务的执行结果才更合适吧?

3. 总结

本文我们学习了一个原生的多智能体案例 - BabyAGI,从环境搭建到运行,对每一步的输出都做了详细的说明,最后对运行过程中发现的一些问题也有一些自己的优化思考。

BabyAGI 其实就是利用大模型进行任务规划,任务排序,任务执行。这三个过程不断循环,再加上一点上下文信息以得到更高质量的结果,直到满足最终的目标。

三个主要过程,全部依赖大模型的能力,有点不可控,目前来说,个人认为落地比较难。咱们主要从中学一下它的实现思想吧。


站内文章一览

相关文章
|
1天前
|
人工智能 自然语言处理 API
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
|
1天前
|
决策智能 开发者
Multi-Agent实践第8期:轻松拖拽搭建多智能体应用
AgentScope Workstation令多智能体应用的搭建变得轻而易举,只需简单拖拽操作,每位用户都能快速打造出丰富多彩的应用。
|
9天前
|
机器学习/深度学习 人工智能 自然语言处理
智能化未来:Agent AI智能体的崛起与全球挑战
智能化未来:Agent AI智能体的崛起与全球挑战
38 1
|
10天前
|
人工智能 监控 前端开发
基于ReAct机制的AI Agent
当前,在各个大厂纷纷卷LLM的情况下,各自都借助自己的LLM推出了自己的AI Agent,比如字节的Coze,百度的千帆等,还有开源的Dify。你是否想知道其中的原理?是否想过自己如何实现一套AI Agent?当然,借助LangChain就可以。
|
15天前
|
物联网 PyTorch 测试技术
LLM 大模型学习必知必会系列(十):基于AgentFabric实现交互式智能体应用,Agent实战
LLM 大模型学习必知必会系列(十):基于AgentFabric实现交互式智能体应用,Agent实战
LLM 大模型学习必知必会系列(十):基于AgentFabric实现交互式智能体应用,Agent实战
|
30天前
|
监控 Unix Windows
Zabbix【部署 04】 Windows系统安装配置agent及agent2
Zabbix【部署 04】 Windows系统安装配置agent及agent2
248 0
|
30天前
|
缓存 监控 安全
zabbix服务器监控之了解agent的启动过程
zabbix服务器监控之了解agent的启动过程
16 0
|
30天前
|
监控 网络协议 Unix
centos7 zabbix安装客户端agent -配置监控远程主机 在需要监控的电脑上安装
centos7 zabbix安装客户端agent -配置监控远程主机 在需要监控的电脑上安装
42 0
|
10月前
|
存储 数据采集 缓存
【运维知识进阶篇】Zabbix5.0稳定版详解9(Zabbix优化:高并发对MySQL进行拆分、Zabbix-agent主动上报模式、使用proxy代理模式、系统自带监控项优化、进程优化、缓存优化)
【运维知识进阶篇】Zabbix5.0稳定版详解9(Zabbix优化:高并发对MySQL进行拆分、Zabbix-agent主动上报模式、使用proxy代理模式、系统自带监控项优化、进程优化、缓存优化)
274 0
|
11月前
|
运维 监控
【运维】在zabbix-server的web界面配置agent主机监控
【运维】在zabbix-server的web界面配置agent主机监控