【AI Agent系列】【MetaGPT多智能体学习】8. MetaGPT多智能体进阶练习 - 使用MetaGPT重构BabyAGI

简介: 【AI Agent系列】【MetaGPT多智能体学习】8. MetaGPT多智能体进阶练习 - 使用MetaGPT重构BabyAGI

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第六篇笔记。

前文(【AI Agent系列】【MetaGPT多智能体学习】7. 剖析BabyAGI:原生多智能体案例一探究竟(附简化版可运行代码))我们已经详细拆机了多智能体案例 - BabyAGI的运行流程和原理。

本文我们来使用 MetaGPT 实现一遍BabyAGI,巩固《MetaGPT多智能体课程》的学习效果。

系列笔记

0. 实现思路分析

BabyAGI 可以简化为三个 Agent,分别为:

  • Execution Agent 接收目标和任务,调用大模型 LLM来生成任务结果。
  • Task Creation Agent 使用大模型LLM 根据目标和前一个任务的结果创建新任务。它的输入是:目标,前一个任务的结果,任务描述和当前任务列表。
  • Prioritization Agent 使用大模型LLM对任务列表进行重新排序。它接受一个参数:当前任务的 ID

Agent的创建与Action的定义我们已经练习过很多次了,应该比较熟练了。个人认为用MetaGPT重构BabyAGI的主要思考点在于:

(1)怎么组织数据流,Agent需要的信息是什么

(2)怎么打通数据流,Agent间的消息如何传递

(3)一些公共的数据怎么存储,例如 objective目标、task列表

上面这三个思考点实现思路都很简单,但想调出通用性强且效果好的最终应用,比较难。下面的思考仅为各位读者提供思路,具体效果的优化需要慢慢调。

1. 编写代码

1.1 Task Creation Agent

1.1.1 Action定义

原 BabyAGI 是给定一个初始任务,先执行任务,然后再执行 TaskCreationAgent。但其实从效果上来看,先执行的初始任务,INITIAL_TASK=Develop a task list,其实就是一种TaskCreationAgent。因此可以不用指定初始任务,直接根据目标先调用 TaskCreationAgent

因此,我在该Action定义的Prompt中,区分了一下是首次调用该Action还是有了执行结果之后的调用:if len(result) > 0:。当第一次调用时,直接根据目标Objective生成一个任务列表。

class TaskCreation(Action):
    name: str = "TaskCreation"
    objective: str = ""
    async def run(self, result: str, task_description: str, task_list: str):
        if len(result) > 0:
            prompt = f"""
            You are to use the result from an execution agent to create new tasks with the following objective: {self.objective}.
            The last completed task has the result: \n{result}
            This result was based on this task description: {task_description}.\n
            """
            if task_list:
                prompt += f"These are incomplete tasks: {task_list}\n"
            prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "
            if task_list:
                prompt += "These new tasks must not overlap with incomplete tasks. "
        else:
            prompt = f"""
            你需要根据给定的任务思考出一系列Tasks,以此来保证能够一步一步地实现该任务的目标。
            任务为: {self.objective}.
            生成的Tasks只要能实现目标就足够了,不要有其它额外的Tasks产生。
            生成的Tasks确保能用文字写出来
            """
        prompt += """
        Return one task per line in your response. The result must be a numbered list in the format:
        #. First task
        #. Second task
        The number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
        Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output. 
        OUTPUT IN CHINESE
        """
        prompt = prompt.replace('  ', '')
        print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')    
        rsp = await self._aask(prompt)
        return rsp

1.1.2 Role定义

因为我将它放在第一个执行的位置,因此它观察环境中的用户信息self._watch([UserRequirement])

class TaskCreationAgent(Role):
    name: str = "TaskCreationAgent"
    profile: str = "TaskCreationAgent"
    objective: str = ""
    task_list: str = ""
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([TaskCreation(objective = self.objective)])
        self._watch([UserRequirement])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories(k=1)  # 获取最近的一条记忆
        
        result = ""
        task_description = ""
        
        try:
            result = msg[0].content.split("|")[0]
            task_description = msg[0].content.split("|")[1]
        except:
            result = ""
            task_description = ""
        task_creation_result = await todo.run(result = result, task_description = task_description, task_list = self.task_list)
        logger.info(f'TaskCreationAgent : {task_creation_result}')
        msg = Message(content=task_creation_result, role=self.profile,
                      cause_by=type(todo), send_to='TaskPrioritizationAgent')
        self.task_list += '\n' + task_creation_result ## 需去重
        return msg

重点在消息提取的过程(_act函数的实现)。根据 TaskCreation Action的需要:

  • result: 上次ExecuteAgent的执行结果
  • task_description: 上次 ExecuteAgent的执行任务名
  • task_list:这里应该是当前的所有未执行的任务列表

resulttask_description 都是来自 ExecuteAgent,所以让 ExecuteAgent 执行完后将消息发送过来就好。然后通过 msg = self.get_memories(k=1) 获取最近的一条消息,就是 ExecuteAgent 发过来的消息。从里面就可以提取出结果和任务名。

task_list 这里我直接将新产生的任务往里加,其实是不对的,但我懒得写了,大家自己实现吧。这里应该是当前所有的任务列表,应该每次都去一下重复的任务,同时从里面去掉 ExecuteAgent 发送过来的执行完的任务。

执行完后消息的去向:send_to='TaskPrioritizationAgent'

1.2 Prioritization Agent

1.2.1 Action定义

这个Action没啥特别注意的,直接将原 Prompt 搬过来用了。

class TaskPrioritization(Action):
    name: str = "TaskPrioritization"
    objective: str = ""
    PROMPT_TEMPLATE: str = """
    You are tasked with prioritizing the following tasks: {tasks}
    Consider the ultimate objective of your team: {objective}.
    Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
    Do not remove any tasks. Return the ranked tasks as a numbered list in the format:
    #. First task
    #. Second task
    The entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
    Do not include any headers before your ranked list or follow your list with any other output.
    OUTPUT IN CHINESE
    """
    async def run(self, tasks: str):
        prompt = self.PROMPT_TEMPLATE.format(objective = self.objective, tasks = tasks)
        prompt = prompt.replace('  ', '')
        rsp = await self._aask(prompt)
        return rsp

1.2.2 Role定义

该 Role的定义也比较好理解,就是观察是否有新的任务产生:self._watch([TaskCreation])

由于其观察的是 TaskCreation,而 TaskCreationAgent 又将消息直接发送了过来,因此 _act 函数中的 msg = self.get_memories(k=1) 其实就是获取的 TaskCreationAgent 本次的执行结果。

最后执行完,消息的去向:send_to='ExecutionAgent'

class TaskPrioritizationAgent(Role):
    name: str = "TaskPrioritizationAgent"
    profile: str = "TaskPrioritizationAgent"
    objective: str = ""
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([TaskPrioritization(objective = self.objective)])
        self._watch([TaskCreation])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories(k=1)  # 获取最新的一条记忆,TaskCreation刚刚生成的任务列表
        task_prioritization_result = await todo.run(tasks = msg[0].content)
        logger.info(f'TaskPrioritizationAgent : {task_prioritization_result}')
        msg = Message(content=task_prioritization_result, role=self.profile,
                      cause_by=type(todo), send_to='ExecutionAgent')
        return msg

1.3 Execution Agent

1.3.1 Action定义

该Action的定义也没什么特别的,将原 Prompt 直接搬过来就可以用。

class ExecuteAction(Action):
    name: str = "ExecuteAction"
    objective: str = ""
    async def run(self, context: str, task: str):
        prompt = f'OUTPUT IN CHINESE. Perform one task based on the following objective: {self.objective}.\n'
        if context:
            prompt += f'Take into account these previously completed tasks:\n{context}'
        prompt += f'\nYour task: {task}\nResponse:'
        prompt = prompt.replace('  ', '')
        print("ExecuteAction: ", prompt)
        rsp = await self._aask(prompt)
        return rsp

1.3.2 Role定义

这里 ExecutionAgent 的执行是由 TaskPrioritizationAgent 来触发的。TaskPrioritizationAgent 的消息指定发送给 ExecutionAgent,因此 ExecutionAgent 即使不 _watch,也会收到消息并执行。

同样地,msg = self.get_memories(k=1) 获取最新的一条消息即为 TaskPrioritizationAgent 排序好的任务列表。然后处理一下取出里面的第一个任务执行。

除了未完成的第一个任务名称外,其Action中还需要一个已经执行完的任务名称列表context,我这里直接用 self.complete_task 存储了。这个已完成的任务名称的存储,在原 BabyAGI 中是使用的向量数据库存储,并且单独使用了一个 context_agent 来查询。本文将其简化了,直接内存中存储一下使用。

执行完成后消息的去向 send_to = "TaskCreationAgent"。然后 TaskCreationAgent 就又可以创建新任务了。

class ExecutionAgent(Role):
    name: str = "ExecutionAgent"
    profile: str = "ExecutionAgent"
    objective: str = ""
    complete_task: str = ""
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([ExecuteAction(objective = self.objective)])
        # self._watch([UserRequirement])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories(k=1)  # 获取最新的一条记忆
        # logger.info(msg)
        
        if len(msg) <= 0:
            print('Received empty response from priotritization agent. ')
            return ""
        new_tasks = msg[0].content.split("\n") if "\n" in msg[0].content else [msg[0].content]
        task_execute = ""
        for task_string in new_tasks:
            task_parts = task_string.strip().split(".", 1)
            if len(task_parts) == 2:
                task_execute = task_parts[1].strip()
                break # 找到第一个任务,退出循环
                
        execute_result = await todo.run(context = self.complete_task, task = task_execute)
        logger.info(f'ExecutionAgent : {execute_result}')
        msg = Message(content=execute_result + "|" + task_execute, role=self.profile,
                      cause_by=type(todo), send_to = "TaskCreationAgent")
        self.complete_task += '\n' + task_execute
        return msg

1.4 Environment创建

创建多智能体消息交互的环境。然后利用 publish_message 函数将第一条消息发送给 TaskCreationAgent,让 TaskCreationAgent 开始运行。

babyAGIEnv = Environment()
async def main(objective: str, n_round=10):
    babyAGIEnv.add_roles([
            ExecutionAgent(objective = objective), 
            TaskCreationAgent(objective = objective),
            TaskPrioritizationAgent(objective = objective)
        ])
    babyAGIEnv.publish_message(
        Message(role="Human", content=objective, cause_by=UserRequirement,
                send_to='TaskCreationAgent')
    )
    while n_round > 0:
        # self._save()
        n_round -= 1
        logger.debug(f"max {n_round=} left.")
        run_result = await babyAGIEnv.run()
        # print("run_result: ", babyAGIEnv.history)
    return babyAGIEnv.history
asyncio.run(main(objective='Plan for Solve world hunger'))

1.5 运行结果展示

(1)创建Task列表

(2)排序

(3)执行第一个任务

(4)根据第一个任务的执行结果,再创建新的任务列表

(5)循环…

画了一个简单的流程:

2. 总结

利用 MetaGPT 实现BabyAGI的任务,基本流程算是通了,还有太多需要改进的地方,大家参考下思路就好。欢迎交流。

提供几个改进点和思考:

  • 本文程序没有停止程序的逻辑,需要加入。停止的逻辑可以参考原实现中的 任务列表为空,也可以是别的逻辑,例如:
  • 引入人工,让人工确认是否可以停止了
  • 引入另一个Agent,这个Agent接收 ExecuteAgent 的结果,用来判定是否执行结果已经足够回答刚开始的Objective问题了,如果已经足够了,则不再执行 TaskCreationAgent,直接停止程序运行
  • 排序过程这里依赖大模型也会有很大的不确定性,也可以考虑引入人工,人工排序 + 删掉不必要的Task

最后,还是那个感受,打通流程很容易,但想真正落地成好用的产品,还太远太远。


  • 大家好,我是同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

站内文章一览

相关文章
|
1月前
|
人工智能 自然语言处理 安全
用AI重构人机关系,OPPO智慧服务带来了更“懂你”的体验
OPPO在2025开发者大会上展现智慧服务新范式:通过大模型与意图识别技术,构建全场景入口矩阵,实现“服务找人”。打通负一屏、小布助手等系统级入口,让服务主动触达用户;为开发者提供统一意图标准、一站式平台与安全准则,降低适配成本,共建开放生态。
219 31
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
304 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
28天前
|
人工智能 测试技术 API
构建AI智能体:二、DeepSeek的Ollama部署FastAPI封装调用
本文介绍如何通过Ollama本地部署DeepSeek大模型,结合FastAPI实现API接口调用。涵盖Ollama安装、路径迁移、模型下载运行及REST API封装全过程,助力快速构建可扩展的AI应用服务。
494 6
|
30天前
|
人工智能 运维 安全
加速智能体开发:从 Serverless 运行时到 Serverless AI 运行时
在云计算与人工智能深度融合的背景下,Serverless 技术作为云原生架构的集大成者,正加速向 AI 原生架构演进。阿里云函数计算(FC)率先提出并实践“Serverless AI 运行时”概念,通过技术创新与生态联动,为智能体(Agent)开发提供高效、安全、低成本的基础设施支持。本文从技术演进路径、核心能力及未来展望三方面解析 Serverless AI 的突破性价值。
|
1月前
|
人工智能 缓存 并行计算
用数学重构 AI的设想:流形注意力 + 自然梯度优化的最小可行落地
本文提出两个数学驱动的AI模块:流形感知注意力(D-Attention)与自然梯度优化器(NGD-Opt)。前者基于热核偏置,在局部邻域引入流形结构,降低计算开销;后者在黎曼流形上进行二阶优化,仅对线性层低频更新前置条件。二者均提供可复现代码与验证路径,兼顾性能与工程可行性,助力几何感知的模型设计与训练。
219 1
|
1月前
|
人工智能 搜索推荐 数据可视化
当AI学会“使用工具”:智能体(Agent)如何重塑人机交互
当AI学会“使用工具”:智能体(Agent)如何重塑人机交互
297 115
|
28天前
|
人工智能 API 开发工具
构建AI智能体:一、初识AI大模型与API调用
本文介绍大模型基础知识及API调用方法,涵盖阿里云百炼平台密钥申请、DashScope SDK使用、Python调用示例(如文本情感分析、图像文字识别),助力开发者快速上手大模型应用开发。
892 16
构建AI智能体:一、初识AI大模型与API调用
|
27天前
|
存储 机器学习/深度学习 人工智能
构建AI智能体:三、Prompt提示词工程:几句话让AI秒懂你心
本文深入浅出地讲解Prompt原理及其与大模型的关系,系统介绍Prompt的核心要素、编写原则与应用场景,帮助用户通过精准指令提升AI交互效率,释放大模型潜能。
351 5
|
29天前
|
机器学习/深度学习 人工智能 JSON
PHP从0到1实现 AI 智能体系统并且训练知识库资料
本文详解如何用PHP从0到1构建AI智能体,涵盖提示词设计、记忆管理、知识库集成与反馈优化四大核心训练维度,结合实战案例与系统架构,助你打造懂业务、会进化的专属AI助手。
184 6

热门文章

最新文章