【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

简介: 【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。

系列笔记

多智能体辩论需求。假设是两个智能体进行辩论。

0. 辩论Action的定义

辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。

class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp

1. 辩论智能体的定义

智能体的职能也一样,所以可以共用一个Role的构造。

1.1 让两个智能体交替运行的重点

它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud]) 设置。

它的动作时机通过 self._watch([UserRequirement, SpeakAloud]) 来设置,可以看到它观察环境中出现 UserRequirementSpeakAloud 来源的消息时,会有所动作。

那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。

我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。

怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。

1.2 重写 _observe 函数

先来看下原_observe函数的实现:

async def _observe(self, ignore_memory=False) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered:
        news = [self.latest_observed_msg] if self.latest_observed_msg else []
    if not news:
        news = self.rc.msg_buffer.pop_all()
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)
    # Filter out messages of interest.
    self.rc.news = [
        n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg
    # Design Rules:
    # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
    # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    return len(self.rc.news)

它的结果 len(self.rc.news) 只要大于0,这个智能体就会执行 _react,开始行动。self.rc.news怎么来的?重点看源码中的这句:

# Filter out messages of interest.
self.rc.news = [
    n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]

self.rc.news 是从 news 中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch 和 指定发送给该智能体的消息 self.name in n.send_to。而 news 其实就是 msg_buffermsg_buffer 来自环境的 publish_message

再来看下环境 publish_message 的代码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """
        Distribute the message to the recipients.
        In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
        in RFC 113 for the entire system, the routing information in the Message is only responsible for
        specifying the message recipient, without concern for where the message recipient is located. How to
        route the message to the message recipient is a problem addressed by the transport framework designed
        in RFC 113.
        """
        logger.debug(f"publish_message: {message.dump()}")
        found = False
        # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)
                found = True
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history += f"\n{message}"  # For debug
        return True

看里面的 is_send_to 函数:

def is_send_to(message: "Message", addresses: set):
    """Return whether it's consumer"""
    if MESSAGE_ROUTE_TO_ALL in message.send_to:
        return True
    for i in addresses:
        if i in message.send_to:
            return True
    return False

如果指定了 message.send_to,则环境只会将该消息发送给指定的Role

光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:

如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。

教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:

async def _observe(self) -> int:
    await super()._observe()
    # accept messages sent (from opponent) to self, disregard own messages from the last round
    self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选
    return len(self.rc.news)

可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。

1.3 重写 _act 函数

这里为什么要重写 _act 函数,最主要的原因是要填执行完结果消息中的 send_to 参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories))。

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    todo = self.rc.todo  # An instance of SpeakAloud
    memories = self.get_memories()
    context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
    print(context)
    rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
    msg = Message(
        content=rsp,
        role=self.profile,
        cause_by=type(todo),
        sent_from=self.name,
        send_to=self.opponent_name,
    )
    self.rc.memory.add(msg)
    return msg

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name,

这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。

给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):

1.4 最终Role的代码

class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
  ## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg

2. 实例化智能体

实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章

其中值得注意的是,team.run_project(idea, send_to="Biden") 通过 send_to 参数指定谁先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)

3. 完整代码及运行结果

完整代码:

import asyncio
import platform
from typing import Any
import fire
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team
class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp
class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg
async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)
def main(idea: str, investment: float = 3.0, n_round: int = 10):
    """
    :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"
                 or "Trump: Climate change is a hoax"
    :param investment: contribute a certain dollar amount to watch the debate
    :param n_round: maximum rounds of the debate
    :return:
    """
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(debate(idea, investment, n_round))
if __name__ == "__main__":
    fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))

运行结果(交替发言):

4. 总结

这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to 参数的使用。

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。


站内文章一览

相关文章
|
9月前
|
机器学习/深度学习 人工智能 算法
AI 基础知识从 0.6 到 0.7—— 彻底拆解深度神经网络训练的五大核心步骤
本文以一个经典的PyTorch手写数字识别代码示例为引子,深入剖析了简洁代码背后隐藏的深度神经网络(DNN)训练全过程。
1392 56
|
7月前
|
人工智能 监控 安全
员工使用第三方AI办公的风险与解决方案:从三星案例看AI的数据防泄漏
生成式AI提升办公效率,也带来数据泄露风险。三星、迪士尼案例揭示敏感信息外泄隐患。AI-FOCUS团队建议构建“流式网关+DLP”防护体系,实现分级管控、全程审计,平衡安全与创新。
|
8月前
|
数据采集 存储 人工智能
拆解AI-Agentforce企业级智能体中台:如何让企业AI落地从“噱头”到“实效”
在GDMS峰会上,迈富时集团尹思源指出41.3%中国企业尚未布局AI Agent,已应用者亦陷“Demo化、孤岛化”困局。其发布的AI-Agentforce智能体中台,以“冰山模型”重构架构,打通认知、价值、能力三重鸿沟,覆盖内容、获客、销售、陪练、分析五大场景,助力企业实现AI从“工具”到“数字员工”的全链路协同升级。
|
9月前
|
人工智能 自然语言处理 搜索推荐
上下文学习的神奇魔法:轻松理解AI如何无师自通
你有没有想过,为什么给GPT几个例子,它就能学会新任务?这就像魔法一样!本文用轻松幽默的方式解密上下文学习的原理,通过「智能客服训练」场景,带你理解AI如何像人类一样从示例中学习,无需额外训练就能掌握新技能。
368 28
|
8月前
|
存储 人工智能 搜索推荐
一种专为AI代理设计的内存层,能够在交互过程中记忆、学习和进化
Mem0 是专为 AI 代理设计的内存层,支持记忆、学习与进化。提供多种记忆类型,可快速集成,适用于开源与托管场景,助力 AI 代理高效交互与成长。
787 123
一种专为AI代理设计的内存层,能够在交互过程中记忆、学习和进化
|
8月前
|
机器学习/深度学习 人工智能 运维
运维告警别乱飞了!AI智能报警案例解析
运维告警别乱飞了!AI智能报警案例解析
791 0
|
8月前
|
机器学习/深度学习 人工智能 自然语言处理
迁移学习:让小数据也能驱动AI大模型
迁移学习:让小数据也能驱动AI大模型
452 99
|
9月前
|
数据采集 人工智能 前端开发
AI智能体如何从错误中学习:反思机制详解
探索AI智能体的反思能力:从哲学思考到技术实现,看AI如何像人类一样从错误中学习和成长。通过轻松有趣的方式,深入了解Reflexion和ReAct等前沿框架,掌握让AI更智能的核心秘密。
611 0
|
8月前
|
数据采集 Web App开发 人工智能
如何让AI“看懂”网页?拆解 Browser-Use 的三大核心技术模块
Browser-Use 是一种基于大语言模型(LLM)的浏览器自动化技术,通过融合视觉理解、DOM解析和动作预测等模块,实现对复杂网页任务的自主操作。它突破了传统固定选择器和流程编排的限制,具备任务规划与语义理解能力,可完成注册、比价、填报等多步骤操作。其核心功能包括视觉与HTML融合解析、多标签管理、元素追踪、自定义动作、自纠错机制,并支持任意LLM模型。Browser-Use标志着浏览器自动化从“规则驱动”向“认知驱动”的跃迁,大幅降低维护成本,提升复杂任务的处理效率与适应性。
4375 29
|
7月前
|
人工智能 JavaScript 前端开发
GenSX (不一样的AI应用框架)架构学习指南
GenSX 是一个基于 TypeScript 的函数式 AI 工作流框架,以“函数组合替代图编排”为核心理念。它通过纯函数组件、自动追踪与断点恢复等特性,让开发者用自然代码构建可追溯、易测试的 LLM 应用。支持多模型集成与插件化扩展,兼具灵活性与工程化优势。
597 6

热门文章

最新文章