本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。
本文为该课程的第四章(多智能体开发)的第五篇笔记。今天我们拆解一个之前提到过的多智能体案例 - BabyAGI,梳理出其实现原理,多智能体间的交互过程(数据流)。这是最原生的多智能体案例,没有用类似AutoGPT或MetaGPT等任何多智能体框架。从这个案例中我们能更好地理解智能体的底层实现原理。
系列笔记
- 【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑
- 【AI Agent系列】【MetaGPT多智能体学习】1. 再理解 AI Agent - 经典案例和热门框架综述
- 【AI Agent系列】【MetaGPT多智能体学习】2. 重温单智能体开发 - 深入源码,理解单智能体运行框架
- 【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制
- 【AI Agent系列】【MetaGPT多智能体学习】4. 基于MetaGPT的Team组件开发你的第一个智能体团队
- 【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)
- 【AI Agent系列】【MetaGPT多智能体学习】6. 多智能体实战 - 基于MetaGPT实现游戏【你说我猜】(附完整代码)
0. BabyAGI 简介
项目地址:https://github.com/yoheinakajima/babyagi/blob/main/README.md
该项目是一个 AI 支持的任务管理系统示例,它根据初始任务或目标,利用OpenAI创建任务列表,并对任务进行优先级排序和执行任务。其背后的主要思想是基于先前任务的结果和预定义的目标创建任务,然后使用 OpenAI 的能力根据目标创建新任务。这是原始的任务驱动的自驱Agent(2023 年 3 月 28 日)的简化版本。
0.1 运行流程
其运行流程如下:
(1)从任务列表中提取第一个任务
(2)将任务发送到执行代理(Execution Agent),该Agent使用LLM根据上下文完成任务。
(3)丰富结果并将其存储在向量数据库中
(4)创建新任务,并根据上一任务的目标和结果重新确定任务列表的优先级。
(5)重复以上步骤
其中涉及四个Agent,前三个Agent都利用了大模型的能力来进行任务规划和总结:
- Execution Agent 接收目标和任务,调用大模型 LLM来生成任务结果。
- Task Creation Agent 使用大模型LLM 根据目标和前一个任务的结果创建新任务。它的输入是:目标,前一个任务的结果,任务描述和当前任务列表。
- Prioritization Agent 使用大模型LLM对任务列表进行重新排序。它接受一个参数:当前任务的 ID
- Context Agent 使用向量存储和检索任务结果以获取上下文。
官方给出的数据流图如下:
1. BabyAGI 运行
要想更好地理解其原理和工作流程,首先需要将项目跑起来。
1.1 下载开源代码
git clone https://github.com/yoheinakajima/babyagi.git pip install -r requirements.txt
1.2 填写配置文件
(1)复制一份 .env.example 文件,并重命名为 .env 文件
cp .env.example .env
(2)在 .env 文件中填入自己的 OpenAI key,OpenAI Base Url 等。我的运行不使用 weaviate 和 pinecone,因此不用填相关的config。
1.3 简化代码
因为我的运行不使用 weaviate 和 pinecone,也只是用 OpenAI 模型,所以将不用的代码删掉了,看起来清爽一点。然后,适配了一下 OpenAI API > 1.0 版本的接口。原代码使用的API < 1.0,太旧了。
#!/usr/bin/env python3 from dotenv import load_dotenv # Load default environment variables (.env) load_dotenv() import os import time import logging from collections import deque from typing import Dict, List import importlib # import openai import chromadb import tiktoken as tiktoken from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction from chromadb.api.types import Documents, EmbeddingFunction, Embeddings import re from openai import OpenAI # default opt out of chromadb telemetry. from chromadb.config import Settings client = chromadb.Client(Settings(anonymized_telemetry=False)) # Engine configuration # Model: GPT, LLAMA, HUMAN, etc. LLM_MODEL = os.getenv("LLM_MODEL", os.getenv("OPENAI_API_MODEL", "gpt-3.5-turbo")).lower() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") # API Keys if not (LLM_MODEL.startswith("llama") or LLM_MODEL.startswith("human")): assert OPENAI_API_KEY, "\033[91m\033[1m" + "OPENAI_API_KEY environment variable is missing from .env" + "\033[0m\033[0m" # Table config RESULTS_STORE_NAME = os.getenv("RESULTS_STORE_NAME", os.getenv("TABLE_NAME", "")) assert RESULTS_STORE_NAME, "\033[91m\033[1m" + "RESULTS_STORE_NAME environment variable is missing from .env" + "\033[0m\033[0m" # Run configuration INSTANCE_NAME = os.getenv("INSTANCE_NAME", os.getenv("BABY_NAME", "BabyAGI")) COOPERATIVE_MODE = "none" JOIN_EXISTING_OBJECTIVE = False # Goal configuration OBJECTIVE = os.getenv("OBJECTIVE", "") INITIAL_TASK = os.getenv("INITIAL_TASK", os.getenv("FIRST_TASK", "")) # Model configuration OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) # Extensions support begin def can_import(module_name): try: importlib.import_module(module_name) return True except ImportError: return False DOTENV_EXTENSIONS = os.getenv("DOTENV_EXTENSIONS", "").split(" ") # Command line arguments extension # Can override any of the above environment variables ENABLE_COMMAND_LINE_ARGS = ( os.getenv("ENABLE_COMMAND_LINE_ARGS", "false").lower() == "true" ) if ENABLE_COMMAND_LINE_ARGS: if can_import("extensions.argparseext"): from extensions.argparseext import parse_arguments OBJECTIVE, INITIAL_TASK, LLM_MODEL, DOTENV_EXTENSIONS, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE = parse_arguments() # Human mode extension # Gives human input to babyagi if LLM_MODEL.startswith("human"): if can_import("extensions.human_mode"): from extensions.human_mode import user_input_await # Load additional environment variables for enabled extensions # TODO: This might override the following command line arguments as well: # OBJECTIVE, INITIAL_TASK, LLM_MODEL, INSTANCE_NAME, COOPERATIVE_MODE, JOIN_EXISTING_OBJECTIVE if DOTENV_EXTENSIONS: if can_import("extensions.dotenvext"): from extensions.dotenvext import load_dotenv_extensions load_dotenv_extensions(DOTENV_EXTENSIONS) # TODO: There's still work to be done here to enable people to get # defaults from dotenv extensions, but also provide command line # arguments to override them # Extensions support end print("\033[95m\033[1m" + "\n*****CONFIGURATION*****\n" + "\033[0m\033[0m") print(f"Name : {INSTANCE_NAME}") print(f"Mode : {'alone' if COOPERATIVE_MODE in ['n', 'none'] else 'local' if COOPERATIVE_MODE in ['l', 'local'] else 'distributed' if COOPERATIVE_MODE in ['d', 'distributed'] else 'undefined'}") print(f"LLM : {LLM_MODEL}") # Check if we know what we are doing assert OBJECTIVE, "\033[91m\033[1m" + "OBJECTIVE environment variable is missing from .env" + "\033[0m\033[0m" assert INITIAL_TASK, "\033[91m\033[1m" + "INITIAL_TASK environment variable is missing from .env" + "\033[0m\033[0m" print("\033[94m\033[1m" + "\n*****OBJECTIVE*****\n" + "\033[0m\033[0m") print(f"{OBJECTIVE}") if not JOIN_EXISTING_OBJECTIVE: print("\033[93m\033[1m" + "\nInitial task:" + "\033[0m\033[0m" + f" {INITIAL_TASK}") else: print("\033[93m\033[1m" + f"\nJoining to help the objective" + "\033[0m\033[0m") # Results storage using local ChromaDB class DefaultResultsStorage: def __init__(self): logging.getLogger('chromadb').setLevel(logging.ERROR) # Create Chroma collection chroma_persist_dir = "chroma" chroma_client = chromadb.PersistentClient( settings=chromadb.config.Settings( persist_directory=chroma_persist_dir, ) ) metric = "cosine" embedding_function = OpenAIEmbeddingFunction(api_key=OPENAI_API_KEY) self.collection = chroma_client.get_or_create_collection( name=RESULTS_STORE_NAME, metadata={"hnsw:space": metric}, embedding_function=embedding_function, ) def add(self, task: Dict, result: str, result_id: str): # Break the function if LLM_MODEL starts with "human" (case-insensitive) if LLM_MODEL.startswith("human"): return # Continue with the rest of the function embeddings = llm_embed.embed(result) if LLM_MODEL.startswith("llama") else None if ( len(self.collection.get(ids=[result_id], include=[])["ids"]) > 0 ): # Check if the result already exists self.collection.update( ids=result_id, embeddings=embeddings, documents=result, metadatas={"task": task["task_name"], "result": result}, ) else: self.collection.add( ids=result_id, embeddings=embeddings, documents=result, metadatas={"task": task["task_name"], "result": result}, ) def query(self, query: str, top_results_num: int) -> List[dict]: count: int = self.collection.count() if count == 0: return [] results = self.collection.query( query_texts=query, n_results=min(top_results_num, count), include=["metadatas"] ) return [item["task"] for item in results["metadatas"][0]] # Initialize results storage def use_chroma(): print("\nUsing results storage: " + "\033[93m\033[1m" + "Chroma (Default)" + "\033[0m\033[0m") return DefaultResultsStorage() results_storage = use_chroma() # Task storage supporting only a single instance of BabyAGI class SingleTaskListStorage: def __init__(self): self.tasks = deque([]) self.task_id_counter = 0 def append(self, task: Dict): self.tasks.append(task) def replace(self, tasks: List[Dict]): self.tasks = deque(tasks) def popleft(self): return self.tasks.popleft() def is_empty(self): return False if self.tasks else True def next_task_id(self): self.task_id_counter += 1 return self.task_id_counter def get_task_names(self): return [t["task_name"] for t in self.tasks] # Initialize tasks storage tasks_storage = SingleTaskListStorage() if COOPERATIVE_MODE in ['l', 'local']: if can_import("extensions.ray_tasks"): import sys from pathlib import Path sys.path.append(str(Path(__file__).resolve().parent)) from extensions.ray_tasks import CooperativeTaskListStorage tasks_storage = CooperativeTaskListStorage(OBJECTIVE) print("\nReplacing tasks storage: " + "\033[93m\033[1m" + "Ray" + "\033[0m\033[0m") elif COOPERATIVE_MODE in ['d', 'distributed']: pass def limit_tokens_from_string(string: str, model: str, limit: int) -> str: """Limits the string to a number of tokens (estimated).""" try: encoding = tiktoken.encoding_for_model(model) except: encoding = tiktoken.encoding_for_model('gpt2') # Fallback for others. encoded = encoding.encode(string) return encoding.decode(encoded[:limit]) client = OpenAI() def openai_call( prompt: str, model: str = LLM_MODEL, temperature: float = OPENAI_TEMPERATURE, max_tokens: int = 100, ): # Use 4000 instead of the real limit (4097) to give a bit of wiggle room for the encoding of roles. trimmed_prompt = limit_tokens_from_string(prompt, model, 4000 - max_tokens) # Use chat completion API messages = [{"role": "system", "content": trimmed_prompt}] response = client.chat.completions.create( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, n=1, stop=None, ) return response.choices[0].message.content.strip() def task_creation_agent( objective: str, result: Dict, task_description: str, task_list: List[str] ): prompt = f""" You are to use the result from an execution agent to create new tasks with the following objective: {objective}. The last completed task has the result: \n{result["data"]} This result was based on this task description: {task_description}.\n""" if task_list: prompt += f"These are incomplete tasks: {', '.join(task_list)}\n" prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. " if task_list: prompt += "These new tasks must not overlap with incomplete tasks. " prompt += """ Return one task per line in your response. The result must be a numbered list in the format: #. First task #. Second task The number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time." Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output. OUTPUT IN CHINESE""" print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n') response = openai_call(prompt, max_tokens=2000) print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n') new_tasks = response.split('\n') new_tasks_list = [] for task_string in new_tasks: task_parts = task_string.strip().split(".", 1) if len(task_parts) == 2: task_id = ''.join(s for s in task_parts[0] if s.isnumeric()) task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip() if task_name.strip() and task_id.isnumeric(): new_tasks_list.append(task_name) # print('New task created: ' + task_name) out = [{"task_name": task_name} for task_name in new_tasks_list] return out def prioritization_agent(): task_names = tasks_storage.get_task_names() bullet_string = '\n' prompt = f""" You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)} Consider the ultimate objective of your team: {OBJECTIVE}. Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective. Do not remove any tasks. Return the ranked tasks as a numbered list in the format: #. First task #. Second task The entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period. Do not include any headers before your ranked list or follow your list with any other output. OUTPUT IN CHINESE""" print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n') response = openai_call(prompt, max_tokens=2000) print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n') if not response: print('Received empty response from priotritization agent. Keeping task list unchanged.') return new_tasks = response.split("\n") if "\n" in response else [response] new_tasks_list = [] for task_string in new_tasks: task_parts = task_string.strip().split(".", 1) if len(task_parts) == 2: task_id = ''.join(s for s in task_parts[0] if s.isnumeric()) task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip() if task_name.strip(): new_tasks_list.append({"task_id": task_id, "task_name": task_name}) return new_tasks_list # Execute a task based on the objective and five previous tasks def execution_agent(objective: str, task: str) -> str: """ Executes a task based on the given objective and previous context. Args: objective (str): The objective or goal for the AI to perform the task. task (str): The task to be executed by the AI. Returns: str: The response generated by the AI for the given task. """ context = context_agent(query=objective, top_results_num=5) # print("\n****RELEVANT CONTEXT****\n") # print(context) # print('') prompt = f'OUTPUT IN CHINESE. Perform one task based on the following objective: {objective}.\n' if context: prompt += 'Take into account these previously completed tasks:' + '\n'.join(context) prompt += f'\nYour task: {task}\nResponse:' return openai_call(prompt, max_tokens=2000) # Get the top n completed tasks for the objective def context_agent(query: str, top_results_num: int): """ Retrieves context for a given query from an index of tasks. Args: query (str): The query or objective for retrieving context. top_results_num (int): The number of top results to retrieve. Returns: list: A list of tasks as context for the given query, sorted by relevance. """ results = results_storage.query(query=query, top_results_num=top_results_num) print("****RESULTS****") print(results) return results # Add the initial task if starting new objective if not JOIN_EXISTING_OBJECTIVE: initial_task = { "task_id": tasks_storage.next_task_id(), "task_name": INITIAL_TASK } tasks_storage.append(initial_task) def main(): loop = True while loop: # As long as there are tasks in the storage... if not tasks_storage.is_empty(): # Print the task list print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m") for t in tasks_storage.get_task_names(): print(" • " + str(t)) # Step 1: Pull the first incomplete task task = tasks_storage.popleft() print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m") print(str(task["task_name"])) # Send to execution function to complete the task based on the context result = execution_agent(OBJECTIVE, str(task["task_name"])) print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m") print(result) # Step 2: Enrich result and store in the results storage # This is where you should enrich the result if needed enriched_result = { "data": result } # extract the actual result from the dictionary # since we don't do enrichment currently # vector = enriched_result["data"] result_id = f"result_{task['task_id']}" results_storage.add(task, result, result_id) # Step 3: Create new tasks and re-prioritize task list # only the main instance in cooperative mode does that new_tasks = task_creation_agent( OBJECTIVE, enriched_result, task["task_name"], tasks_storage.get_task_names(), ) print('Adding new tasks to task_storage') for new_task in new_tasks: new_task.update({"task_id": tasks_storage.next_task_id()}) print(str(new_task)) tasks_storage.append(new_task) if not JOIN_EXISTING_OBJECTIVE: prioritized_tasks = prioritization_agent() if prioritized_tasks: tasks_storage.replace(prioritized_tasks) # Sleep a bit before checking the task list again time.sleep(5) else: print('Done.') loop = False if __name__ == "__main__": main()
1.4 运行
这时候点运行,应该就能运行成功了。但特别注意,不建议直接运行。由于大模型规划任务的能力具有很大的不确定性,很可能导致你的运行产生大量的任务,甚至任务会越来越多,不会停止。这样你的Key,或者你的Token消耗,就很大很大,甚至被封号或者直接破产了 !!!。我是debug运行方式,在每个函数里面都打了断点,这样可以随时停止。
2. 运行过程及结果分析
- 给定Objective:周杰伦的生日是星期几。
- 初始Task为:Develop a task list
2.1 运行输出 - 详细解释
(1)根据给定的 Objective 和 初始Task。刚开始 Task List 中只有一个初始Task。所以执行时 Next Task 就是这个初始Task。execution_agent
执行这个任务,运行结果为根据Objective产出一系列实现它的步骤任务,这里产生了4个。说实话,有点多了。
(2)根据 execution_agent
产生的结果和 Objective最终目标,task_creation_agent
创建了新的 Task 列表,添加到任务队列中。
(3)根据 task_creation_agent
产生的任务列表,prioritization_agent
根据任务列表和最终目标进行优先级排序。这里的排序就有点不靠谱了,大模型的能力还是不稳定
(4)下一次循环开始,从任务列表中取出第一个未完成任务,execution_agent
执行任务。上面错误的任务排序,导致了这里的大模型给出幻觉的答案…
(5)又是根据 execution_agent
产生的结果和 Objective最终目标,task_creation_agent
创建了新的 Task 列表,添加到任务队列中。
(6)根据 task_creation_agent
新产生的任务列表,prioritization_agent
根据新的任务列表和最终目标再次进行优先级排序。
(7)又是一轮循环,从任务列表中取出第一个未完成任务,execution_agent
执行任务。
(8)… 一直循环,直到任务队列没有未完成的任务。
看出来没?以这个进度,任务队列什么时候才能空?太浪费 Token 了。
2.2 问题及思考
从上面的运行过程也看出来了,这个多智能体案例太依赖大模型的能力了,就像我之前写的AutoGPT(【AI大模型应用开发】【AutoGPT系列】2. 手撕AutoGPT - 手把手教你用LangChain从0开始写一个简易版AutoGPT(0))一样。
目前看到的问题及一些思考:
(1)第一步产生的子任务太多 - 初始任务可以多写点 Prompt,限制下任务数量或质量
(2)优先级排序,有点不稳定,强依赖大模型的能力。或许需要引入人为排序?
(3)即使答案能够回答用户设置的Objective问题,但是只要任务里还有未完成的任务,它就会继续运行。- 需要添加结束的限制条件或判断。
例如下面这个例子,我给的 Objective 是 “历史上的今天发生了什么?” 。它直接给排了一大堆没必要的任务。
然后其实下面执行第一个任务后就得到了我想要的结果。
但它还在继续创建新的任务,我也不知道它什么时候会停止。
(4)最严重的问题,前面提到的,没有循环次数的限制,代码这样写,基本快等于 while True了,无限循环了。太危险了。
(5)还有一个没理解的点,为啥在 execution_agent
中要获取前面的任务?这里应该换成前面任务的执行结果才更合适吧?
3. 总结
本文我们学习了一个原生的多智能体案例 - BabyAGI,从环境搭建到运行,对每一步的输出都做了详细的说明,最后对运行过程中发现的一些问题也有一些自己的优化思考。
BabyAGI 其实就是利用大模型进行任务规划,任务排序,任务执行。这三个过程不断循环,再加上一点上下文信息以得到更高质量的结果,直到满足最终的目标。
三个主要过程,全部依赖大模型的能力,有点不可控,目前来说,个人认为落地比较难。咱们主要从中学一下它的实现思想吧。
站内文章一览