【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制

简介: 【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第一篇笔记。主要记录下多智能体的运行机制及跟着教程,实现一个简单的多智能体系统。

0. 多智能体间互通的方式 - Enviroment组件

0.1 多智能体间协作方式简介

在上次课中,我也曾写过 多智能体运行机制 的笔记(这篇文章:【AI Agent系列】【MetaGPT】【深入源码】智能体的运行周期以及多智能体间如何协作)。其中我自己通过看源码,总结出了MetaGPT多智能体间协作的方式:

(1)每一个Role都在不断观察环境中的信息(_observe函数)

(2)当观察到自己想要的信息后,就会触发后续相应的动作

(3)如果没有观察到想要的信息,则会一直循环观察

(4)执行完动作后,会将产生的msg放到环境中(publish_message),供其它Role智能体来使用。

现在再结合《MetaGPT多智能体》课程的教案,发现还是理解地有些浅了,我只关注了智能体在关注自己想要的信息,观察到了之后就开始行动,而忽略了其背后的基础组件 - Enviroment。

0.2 Environment组件 - 深入源码

MetaGPT中对Environment的定义:环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到。短短一句话,总结三个功能:

  • 承载角色
  • 接收角色发布的消息
  • 环境中的消息被角色得到

下面我们分开来细说。

0.2.1 参数介绍

首先来看下 Environment 的参数:

class Environment(ExtEnv):
    """环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到
    Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles
    """
    model_config = ConfigDict(arbitrary_types_allowed=True)
    desc: str = Field(default="")  # 环境描述
    roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)
    member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)
    history: str = ""  # For debug
    context: Context = Field(default_factory=Context, exclude=True)
  • desc:环境的描述
  • roles:字典类型,指定当前环境中的角色
  • member_addrs:字典类型,表示当前环境中的角色以及他们对应的状态
  • history:记录环境中发生的消息记录
  • context:当前环境的一些上下文信息

0.2.2 add_roles函数 - 承载角色

def add_roles(self, roles: Iterable["Role"]):
    """增加一批在当前环境的角色
    Add a batch of characters in the current environment
    """
    for role in roles:
        self.roles[role.profile] = role
    for role in roles:  # setup system message with roles
        role.set_env(self)
        role.context = self.context

从源码中看,这个函数的功能有两个:

(1)给 Environment 的 self.roles 参数赋值,上面我们已经知道它是一个字典类型,现在看来,它的 key 为 role 的 profile,值为 role 本身。

疑问: role 的 profile 默认是空(profile: str = ""),可以没有值,那如果使用者懒得写profile,这里会不会最终只有一个 Role,导致无法实现多智能体?欢迎讨论交流。

(2)给添加进来的 role 设置环境信息

Role的 set_env 函数源码如下,它又给env设置了member_addrs。有点绕?

def set_env(self, env: "Environment"):
    """Set the environment in which the role works. The role can talk to the environment and can also receive
    messages by observing."""
    self.rc.env = env
    if env:
        env.set_addresses(self, self.addresses)
        self.llm.system_prompt = self._get_prefix()
        self.set_actions(self.actions)  # reset actions to update llm and prefix

通过 add_roles 函数,将 role 和 Environment 关联起来。

0.2.3 publish_message函数 - 接收角色发布的消息 / 环境中的消息被角色得到

Environment 中的 publish_message 函数是 Role 在执行完动作之后,将自身产生的消息发布到环境中调用的接口。Role的调用方式如下:

def publish_message(self, msg):
    """If the role belongs to env, then the role's messages will be broadcast to env"""
    if not msg:
        return
    if not self.rc.env:
        # If env does not exist, do not publish the message
        return
    self.rc.env.publish_message(msg)

通过上面的代码来看,一个Role只能存在于一个环境中?

然后看下 Environment 中源码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
    """
    Distribute the message to the recipients.
    In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
    in RFC 113 for the entire system, the routing information in the Message is only responsible for
    specifying the message recipient, without concern for where the message recipient is located. How to
    route the message to the message recipient is a problem addressed by the transport framework designed
    in RFC 113.
    """
    logger.debug(f"publish_message: {message.dump()}")
    found = False
    # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
    for role, addrs in self.member_addrs.items():
        if is_send_to(message, addrs):
            role.put_message(message)
            found = True
    if not found:
        logger.warning(f"Message no recipients: {message.dump()}")
    self.history += f"\n{message}"  # For debug
    return True

其中重点是这三行代码,检查环境中的所有Role是否订阅了该消息(或该消息是否应该发送给环境中的某个Role),如果订阅了,则调用Role的put_message函数,Role的 put_message函数的作用是将message放到本身的msg_buffer中:

for role, addrs in self.member_addrs.items():
    if is_send_to(message, addrs):
        role.put_message(message)

这样就实现了将环境中的某个Role的Message放到环境中,并通知给环境中其它的Role的机制。

0.2.4 run函数 - 运行入口

run函数的源码如下:

async def run(self, k=1):
    """处理一次所有信息的运行
    Process all Role runs at once
    """
    for _ in range(k):
        futures = []
        for role in self.roles.values():
            future = role.run()
            futures.append(future)
        await asyncio.gather(*futures)
        logger.debug(f"is idle: {self.is_idle}")

k = 1, 表示处理一次消息?为什么要有这个值,难道还能处理多次?意义是什么?

该函数其实就是遍历一遍当前环境中的所有Role,然后运行Role的run函数(运行单智能体)。

Role的run里面,就是用该环境观察信息,行动,发布信息到环境。这样多个Role的运行就通过 Environment 串起来了,这些之前已经写过了,不再赘述,见:【AI Agent系列】【MetaGPT】【深入源码】智能体的运行周期以及多智能体间如何协作

1. 开发一个简单的多智能体系统

复现教程中的demo。

1.1 多智能体需求描述

  • 两个智能体:学生 和 老师
  • 任务及预期流程:学生写诗 —> 老师给改进意见 —> 学生根据意见改进 —> 老师给改进意见 —> … n轮 … —> 结束

1.2 代码实现

1.2.1 学生智能体

学生智能体的主要内容就是根据指定的内容写诗。

因此,首先定义一个写诗的Action:WritePoem

然后,定义学生智能体,指定它的动作就是写诗(self.set_actions([WritePoem])

那么它什么时候开始动作呢?通过 self._watch([UserRequirement, ReviewPoem]) 设置其观察的信息。当它观察到环境中有了 UserRequirement 或者 ReviewPoem 产生的信息之后,开始动作。UserRequirement为用户的输入信息类型。

class WritePoem(Action):
    name: str = "WritePoem"
    PROMPT_TEMPLATE: str = """
    Here is the historical conversation record : {msg} .
    Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.
    If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.
    your poem:
    """
    async def run(self, msg: str):
        prompt = self.PROMPT_TEMPLATE.format(msg = msg)
        rsp = await self._aask(prompt)
        return rsp
class Student(Role):
    name: str = "xiaoming"
    profile: str = "Student"
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([WritePoem])
        self._watch([UserRequirement, ReviewPoem])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories()  # 获取所有记忆
        # logger.info(msg)
        poem_text = await WritePoem().run(msg)
        logger.info(f'student : {poem_text}')
        msg = Message(content=poem_text, role=self.profile,
                      cause_by=type(todo))
        return msg

1.2.2 老师智能体

老师的智能体的任务是根据学生写的诗,给出修改意见。

因此创建一个Action为ReviewPoem

然后,创建老师的智能体,指定它的动作为Review(self.set_actions([ReviewPoem])

它的触发实际应该是学生写完诗以后,因此,加入self._watch([WritePoem])

class ReviewPoem(Action):
    name: str = "ReviewPoem"
    PROMPT_TEMPLATE: str = """
    Here is the historical conversation record : {msg} .
    Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.
    Return only your comments with NO other texts.
    your comments:
    """
    async def run(self, msg: str):
        prompt = self.PROMPT_TEMPLATE.format(msg = msg)
        rsp = await self._aask(prompt)
        return rsp
class Teacher(Role):
    name: str = "laowang"
    profile: str = "Teacher"
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([ReviewPoem])
        self._watch([WritePoem])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories()  # 获取所有记忆
        poem_text = await ReviewPoem().run(msg)
        logger.info(f'teacher : {poem_text}')
        msg = Message(content=poem_text, role=self.profile,
                      cause_by=type(todo))
        return msg

1.2.3 创建多智能体交流的环境

前面我们已经知道了多智能体之间的交互需要一个非常重要的组件 - Environment。

因此,创建一个供多智能体交流的环境,通过 add_roles 加入智能体。

然后,当用户输入内容时,将该内容添加到环境中publish_message,从而触发相应智能体开始运行。这里cause_by=UserRequirement代表消息是由用户产生的,学生智能体关心这类消息,观察到之后就开始行动了。

classroom = Environment()
classroom.add_roles([Student(), Teacher()])
classroom.publish_message(
    Message(role="Human", content=topic, cause_by=UserRequirement,
            send_to='' or MESSAGE_ROUTE_TO_ALL),
    peekable=False,
)

1.2.4 运行

加入相应头文件,指定交互次数,就可以开始跑了。注意交互次数,直接决定了你的程序的效果和你的钱包!

import asyncio
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environment
from metagpt.const import MESSAGE_ROUTE_TO_ALL
async def main(topic: str, n_round=3):
    while n_round > 0:
        # self._save()
        n_round -= 1
        logger.debug(f"max {n_round=} left.")
        await classroom.run()
    return classroom.history
asyncio.run(main(topic='wirte a poem about moon'))

1.2.5 完整代码

import asyncio
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environment
from metagpt.const import MESSAGE_ROUTE_TO_ALL
classroom = Environment()
class WritePoem(Action):
    name: str = "WritePoem"
    PROMPT_TEMPLATE: str = """
    Here is the historical conversation record : {msg} .
    Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.
    If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.
    your poem:
    """
    async def run(self, msg: str):
        prompt = self.PROMPT_TEMPLATE.format(msg = msg)
        rsp = await self._aask(prompt)
        return rsp
class ReviewPoem(Action):
    name: str = "ReviewPoem"
    PROMPT_TEMPLATE: str = """
    Here is the historical conversation record : {msg} .
    Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.
    Return only your comments with NO other texts.
    your comments:
    """
    async def run(self, msg: str):
        prompt = self.PROMPT_TEMPLATE.format(msg = msg)
        rsp = await self._aask(prompt)
        return rsp
    
class Student(Role):
    name: str = "xiaoming"
    profile: str = "Student"
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([WritePoem])
        self._watch([UserRequirement, ReviewPoem])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories()  # 获取所有记忆
        # logger.info(msg)
        poem_text = await WritePoem().run(msg)
        logger.info(f'student : {poem_text}')
        msg = Message(content=poem_text, role=self.profile,
                      cause_by=type(todo))
        return msg
class Teacher(Role):
    name: str = "laowang"
    profile: str = "Teacher"
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.set_actions([ReviewPoem])
        self._watch([WritePoem])
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        todo = self.rc.todo
        msg = self.get_memories()  # 获取所有记忆
        poem_text = await ReviewPoem().run(msg)
        logger.info(f'teacher : {poem_text}')
        msg = Message(content=poem_text, role=self.profile,
                      cause_by=type(todo))
        return msg
async def main(topic: str, n_round=3):
    classroom.add_roles([Student(), Teacher()])
    classroom.publish_message(
        Message(role="Human", content=topic, cause_by=UserRequirement,
                send_to='' or MESSAGE_ROUTE_TO_ALL),
        peekable=False,
    )
    while n_round > 0:
        # self._save()
        n_round -= 1
        logger.debug(f"max {n_round=} left.")
        await classroom.run()
    return classroom.history
asyncio.run(main(topic='wirte a poem about moon'))
  • 运行结果

2. 总结

总结一下整个流程吧,画了个图。

从最外围环境 classroom开始,用户输入的信息通过 publish_message 发送到所有Role中,通过Role的 put_message 放到自身的 msg_buffer中。

当Role运行run时,_observemsg_buffer中提取信息,然后只过滤自己关心的消息,例如Teacher只过滤出来自WritePoem的,其它消息虽然在msg_buffer中,但不处理。同时,msg_buffer中的消息会存入memory中。

run完即执行完相应动作后,通过publish_message将结果消息发布到环境中,环境的publish_message又将这个消息发送个全部的Role,这时候所有的Role的msg_buffer中都有了这个消息。完成了智能体间信息的一个交互闭环。


站内文章一览

相关文章
|
1天前
|
开发框架 API 决策智能
ModelScope-Agent框架再升级!新增一键配置多人聊天,配套开源多智能体数据集和训练
ModelScope-Agent是魔搭社区推出的适配开源大语言模型(LLM)的AI Agent(智能体)开发框架,借助ModelScope-Agent,所有开发者都可基于开源 LLM 搭建属于自己的智能体应用。在最新升级完Assistant API和Tool APIs之后,我们又迎来了多智能体聊天室的升级,通过几分钟快速配置即可搭建一个全新的聊天室。
|
1天前
|
机器学习/深度学习 人工智能 自然语言处理
Agent AI智能体的未来角色、发展路径及其面临的挑战
Agent AI智能体的未来角色、发展路径及其面临的挑战
|
2天前
|
机器学习/深度学习 人工智能 自然语言处理
构建未来:AI驱动的自适应学习系统
【5月更文挑战第22天】 随着人工智能技术的迅猛发展,教育领域正在经历一场由数据驱动的革新。本文将探讨AI技术在构建自适应学习系统中的关键作用,分析其如何通过个性化教学方案提高学习效率,并预测未来发展趋势。我们将深入研究机器学习算法如何识别学习者的需求,实时调整教学内容和难度,以及AI如何帮助教师和学生在教育过程中实现更好的互动和反馈。
22 0
|
3天前
|
机器学习/深度学习 人工智能 自然语言处理
构建未来:AI在持续学习系统中的创新应用
【5月更文挑战第21天】 随着人工智能(AI)技术的不断进步,机器学习模型正变得更加复杂和高效。然而,这些模型往往需要大量的数据和计算资源来训练,并且一旦部署,就很难适应新的数据或环境。为了解决这个问题,研究人员正在开发新的AI技术,使得机器能够进行持续学习。本文将探讨这种新兴的AI技术,并讨论其在各种领域的应用潜力。
|
3天前
|
机器学习/深度学习 传感器 人工智能
构建未来:AI驱动的自适应交通管理系统
【5月更文挑战第21天】 在本文中,我们将探讨一个由人工智能(AI)技术驱动的自适应交通管理系统的架构和实现。该系统利用机器学习算法实时分析交通数据,预测并优化交通流,从而减少拥堵,提高道路使用效率。通过与传统交通管理方法的比较,我们展示了AI技术如何提升城市交通管理的智能化水平,以及这些技术对环境、经济和社会的潜在积极影响。
11 3
|
4天前
|
机器学习/深度学习 人工智能 算法
构建未来:AI在持续学习系统中的进化
【5月更文挑战第20天】 随着人工智能(AI)技术的迅猛发展,机器学习模型正变得越来越复杂。然而,真正的智能不仅仅在于处理大量数据和解决特定问题,而在于不断学习和适应新环境。本文将探讨AI如何通过持续学习系统进化,以实现更加智能化的未来。我们将分析最新的研究进展,包括神经网络的自适应调整、增强学习的新策略以及元学习框架的开发。通过这些技术,AI能够更好地理解复杂的模式,并在不断变化的环境中保持其性能。文章还将讨论实施这些系统所面临的挑战,以及可能的解决方案。
|
4天前
|
机器学习/深度学习 人工智能 自动驾驶
构建未来:AI技术在智能交通系统中的应用
【5月更文挑战第20天】 随着人工智能技术的飞速进步,其在现代交通系统中的应用日益广泛,从智能导航到自动车辆调度,AI正逐步改变我们的出行方式和交通管理。本文深入探讨了AI技术在智能交通系统中的多种应用,分析了其提升交通效率、增强安全性及减少环境影响的潜在能力。同时,讨论了实施这些技术所面临的挑战和未来的发展方向,为读者提供了一个关于AI如何塑造未来交通网络的全面视角。
|
5天前
|
机器学习/深度学习 人工智能 运维
构建高效自动化运维系统:DevOps与AI的融合
【5月更文挑战第19天】 在数字化转型的浪潮中,企业IT运维面临着日益复杂的挑战。传统的手动运维方式已经无法满足快速迭代和高可靠性的需求。本文探讨了如何通过结合DevOps理念和人工智能(AI)技术,构建一个高效的自动化运维系统。文章首先回顾了DevOps的核心原则及其在自动化运维中的应用,接着分析了AI如何增强故障预测、智能决策和自动化流程的能力。最后,提出了一个综合DevOps与AI技术的自动化运维框架,并讨论了其在实际部署中的优势和潜在挑战。
|
5天前
|
存储 人工智能 API
文心大模型的智能体(Agent)平台
文心智能体平台是百度推出的大模型开发平台,支持各类开发者创建个性化智能体(Agent)和AI插件。平台提供零代码、低代码工具,让不同能力的开发者能轻松构建智能体,同时具备大模型的强大能力,如内容创作和多模态生成。智能体具备主动思考、理解和记忆功能,可在多个场景下与用户互动。AI插件则扩展了大模型的应用,包括信息检索、多模态交互和服务自动化等。平台还提供流量分发,助力商业闭环。了解更多:[文心智能体平台介绍](https://chatgpt.ciilii.com/show/news-715.html)。
|
9天前
|
存储 机器学习/深度学习 人工智能
新一代数据库技术:融合AI的智能数据管理系统
传统数据库管理系统在数据存储和查询方面已经取得了巨大的成就,但随着数据量的不断增长和应用场景的多样化,传统数据库已经难以满足日益增长的需求。本文将介绍一种新一代数据库技术,即融合了人工智能技术的智能数据管理系统。通过结合AI的强大能力,这种系统能够实现更高效的数据管理、更智能的数据分析和更精准的数据预测,为用户带来全新的数据管理体验。

热门文章

最新文章