【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

简介: 【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。

系列笔记

多智能体辩论需求。假设是两个智能体进行辩论。

0. 辩论Action的定义

辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。

class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp

1. 辩论智能体的定义

智能体的职能也一样,所以可以共用一个Role的构造。

1.1 让两个智能体交替运行的重点

它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud]) 设置。

它的动作时机通过 self._watch([UserRequirement, SpeakAloud]) 来设置,可以看到它观察环境中出现 UserRequirementSpeakAloud 来源的消息时,会有所动作。

那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。

我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。

怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。

1.2 重写 _observe 函数

先来看下原_observe函数的实现:

async def _observe(self, ignore_memory=False) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered:
        news = [self.latest_observed_msg] if self.latest_observed_msg else []
    if not news:
        news = self.rc.msg_buffer.pop_all()
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)
    # Filter out messages of interest.
    self.rc.news = [
        n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg
    # Design Rules:
    # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
    # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    return len(self.rc.news)

它的结果 len(self.rc.news) 只要大于0,这个智能体就会执行 _react,开始行动。self.rc.news怎么来的?重点看源码中的这句:

# Filter out messages of interest.
self.rc.news = [
    n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]

self.rc.news 是从 news 中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch 和 指定发送给该智能体的消息 self.name in n.send_to。而 news 其实就是 msg_buffermsg_buffer 来自环境的 publish_message

再来看下环境 publish_message 的代码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """
        Distribute the message to the recipients.
        In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
        in RFC 113 for the entire system, the routing information in the Message is only responsible for
        specifying the message recipient, without concern for where the message recipient is located. How to
        route the message to the message recipient is a problem addressed by the transport framework designed
        in RFC 113.
        """
        logger.debug(f"publish_message: {message.dump()}")
        found = False
        # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)
                found = True
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history += f"\n{message}"  # For debug
        return True

看里面的 is_send_to 函数:

def is_send_to(message: "Message", addresses: set):
    """Return whether it's consumer"""
    if MESSAGE_ROUTE_TO_ALL in message.send_to:
        return True
    for i in addresses:
        if i in message.send_to:
            return True
    return False

如果指定了 message.send_to,则环境只会将该消息发送给指定的Role

光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:

如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。

教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:

async def _observe(self) -> int:
    await super()._observe()
    # accept messages sent (from opponent) to self, disregard own messages from the last round
    self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选
    return len(self.rc.news)

可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。

1.3 重写 _act 函数

这里为什么要重写 _act 函数,最主要的原因是要填执行完结果消息中的 send_to 参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories))。

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    todo = self.rc.todo  # An instance of SpeakAloud
    memories = self.get_memories()
    context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
    print(context)
    rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
    msg = Message(
        content=rsp,
        role=self.profile,
        cause_by=type(todo),
        sent_from=self.name,
        send_to=self.opponent_name,
    )
    self.rc.memory.add(msg)
    return msg

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name,

这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。

给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):

1.4 最终Role的代码

class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
  ## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg

2. 实例化智能体

实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章

其中值得注意的是,team.run_project(idea, send_to="Biden") 通过 send_to 参数指定谁先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)

3. 完整代码及运行结果

完整代码:

import asyncio
import platform
from typing import Any
import fire
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team
class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp
class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg
async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)
def main(idea: str, investment: float = 3.0, n_round: int = 10):
    """
    :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"
                 or "Trump: Climate change is a hoax"
    :param investment: contribute a certain dollar amount to watch the debate
    :param n_round: maximum rounds of the debate
    :return:
    """
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(debate(idea, investment, n_round))
if __name__ == "__main__":
    fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))

运行结果(交替发言):

4. 总结

这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to 参数的使用。

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。


站内文章一览

相关文章
|
8月前
|
人工智能 IDE Java
AI Coding实践:CodeFuse + prompt 从系分到代码
在蚂蚁国际信贷业务系统建设过程中,技术团队始终面临双重考验:一方面需应对日益加速的需求迭代周期,满足严苛的代码质量规范与金融安全合规要求;另一方面,跨地域研发团队的协同效率与代码标准统一性,在传统开发模式下逐渐显现瓶颈。为突破效率制约、提升交付质量,我们积极探索人工智能辅助代码生成技术(AI Coding)的应用实践。本文基于蚂蚁国际信贷技术团队近期的实际项目经验,梳理AI辅助开发在金融级系统快速迭代场景中的实施要点并分享阶段性实践心得。
1934 25
AI Coding实践:CodeFuse + prompt 从系分到代码
|
8月前
|
人工智能 自然语言处理 安全
氛围编程陷阱:为什么AI生成代码正在制造大量"伪开发者"
AI兴起催生“氛围编程”——用自然语言生成代码,看似高效实则陷阱。它让人跳过编程基本功,沦为只会提示、不懂原理的“中间商”。真实案例显示,此类项目易崩溃、难维护,安全漏洞频出。AI是技能倍增器,非替代品;真正强大的开发者,永远是那些基础扎实、能独立解决问题的人。
940 12
氛围编程陷阱:为什么AI生成代码正在制造大量"伪开发者"
|
8月前
|
人工智能 机器人 测试技术
AI写的代码为何金玉其外败絮其中
本文分析AI编码看着好看其实很烂的现象、原因,探索行之有效的的解决方案。并从理论上延伸到如何更好的与AI协作的方式上。
316 3
|
8月前
|
人工智能 监控 Java
零代码改造 + 全链路追踪!Spring AI 最新可观测性详细解读
Spring AI Alibaba 通过集成 OpenTelemetry 实现可观测性,支持框架原生和无侵入探针两种方式。原生方案依赖 Micrometer 自动埋点,适用于快速接入;无侵入探针基于 LoongSuite 商业版,无需修改代码即可采集标准 OTLP 数据,解决了原生方案扩展性差、调用链易断链等问题。未来将开源无侵入探针方案,整合至 AgentScope Studio,并进一步增强多 Agent 场景下的观测能力。
3014 96
|
8月前
|
人工智能 安全 开发工具
C3仓库AI代码门禁通用实践:基于Qwen3-Coder+RAG的代码评审
本文介绍基于Qwen3-Coder、RAG与Iflow在C3级代码仓库落地LLM代码评审的实践,实现AI辅助人工评审。通过CI流水线自动触发,结合私域知识库与生产代码同仓管理,已成功拦截数十次高危缺陷,显著提升评审效率与质量,具备向各类代码门禁平台复用推广的价值。(239字)
1593 24
|
8月前
|
数据采集 人工智能 JSON
Prompt 工程实战:如何让 AI 生成高质量的 aiohttp 异步爬虫代码
Prompt 工程实战:如何让 AI 生成高质量的 aiohttp 异步爬虫代码
|
8月前
|
人工智能 JavaScript 前端开发
GenSX (不一样的AI应用框架)架构学习指南
GenSX 是一个基于 TypeScript 的函数式 AI 工作流框架,以“函数组合替代图编排”为核心理念。它通过纯函数组件、自动追踪与断点恢复等特性,让开发者用自然代码构建可追溯、易测试的 LLM 应用。支持多模型集成与插件化扩展,兼具灵活性与工程化优势。
681 6
|
8月前
|
机器学习/深度学习 人工智能 搜索推荐
拔俗AI学伴智能体系统:基于大模型与智能体架构的下一代个性化学习引擎
AI学伴智能体系统融合大模型、多模态理解与自主决策,打造具备思考能力的个性化学习伙伴。通过动态推理、长期记忆、任务规划与教学逻辑优化,实现千人千面的自适应教育,助力因材施教落地,推动教育公平与效率双提升。(238字)
1049 0
|
9月前
|
分布式计算 测试技术 Spark
科大讯飞开源星火化学大模型、文生音效模型
近期,科大讯飞在魔搭社区(ModelScope)和Gitcode上开源两款模型:讯飞星火化学大模型Spark Chemistry-X1-13B、讯飞文生音频模型AudioFly,助力前沿化学技术研究,以及声音生成技术和应用的探索。
764 2

热门文章

最新文章