【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

简介: 【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。

系列笔记

多智能体辩论需求。假设是两个智能体进行辩论。

0. 辩论Action的定义

辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。

class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp

1. 辩论智能体的定义

智能体的职能也一样,所以可以共用一个Role的构造。

1.1 让两个智能体交替运行的重点

它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud]) 设置。

它的动作时机通过 self._watch([UserRequirement, SpeakAloud]) 来设置,可以看到它观察环境中出现 UserRequirementSpeakAloud 来源的消息时,会有所动作。

那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。

我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。

怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。

1.2 重写 _observe 函数

先来看下原_observe函数的实现:

async def _observe(self, ignore_memory=False) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered:
        news = [self.latest_observed_msg] if self.latest_observed_msg else []
    if not news:
        news = self.rc.msg_buffer.pop_all()
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)
    # Filter out messages of interest.
    self.rc.news = [
        n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg
    # Design Rules:
    # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
    # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    return len(self.rc.news)

它的结果 len(self.rc.news) 只要大于0,这个智能体就会执行 _react,开始行动。self.rc.news怎么来的?重点看源码中的这句:

# Filter out messages of interest.
self.rc.news = [
    n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]

self.rc.news 是从 news 中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch 和 指定发送给该智能体的消息 self.name in n.send_to。而 news 其实就是 msg_buffermsg_buffer 来自环境的 publish_message

再来看下环境 publish_message 的代码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """
        Distribute the message to the recipients.
        In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
        in RFC 113 for the entire system, the routing information in the Message is only responsible for
        specifying the message recipient, without concern for where the message recipient is located. How to
        route the message to the message recipient is a problem addressed by the transport framework designed
        in RFC 113.
        """
        logger.debug(f"publish_message: {message.dump()}")
        found = False
        # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)
                found = True
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history += f"\n{message}"  # For debug
        return True

看里面的 is_send_to 函数:

def is_send_to(message: "Message", addresses: set):
    """Return whether it's consumer"""
    if MESSAGE_ROUTE_TO_ALL in message.send_to:
        return True
    for i in addresses:
        if i in message.send_to:
            return True
    return False

如果指定了 message.send_to,则环境只会将该消息发送给指定的Role

光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:

如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。

教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:

async def _observe(self) -> int:
    await super()._observe()
    # accept messages sent (from opponent) to self, disregard own messages from the last round
    self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选
    return len(self.rc.news)

可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。

1.3 重写 _act 函数

这里为什么要重写 _act 函数,最主要的原因是要填执行完结果消息中的 send_to 参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories))。

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    todo = self.rc.todo  # An instance of SpeakAloud
    memories = self.get_memories()
    context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
    print(context)
    rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
    msg = Message(
        content=rsp,
        role=self.profile,
        cause_by=type(todo),
        sent_from=self.name,
        send_to=self.opponent_name,
    )
    self.rc.memory.add(msg)
    return msg

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name,

这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。

给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):

1.4 最终Role的代码

class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
  ## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg

2. 实例化智能体

实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章

其中值得注意的是,team.run_project(idea, send_to="Biden") 通过 send_to 参数指定谁先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)

3. 完整代码及运行结果

完整代码:

import asyncio
import platform
from typing import Any
import fire
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team
class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""
    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"
    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)
        rsp = await self._aask(prompt)
        return rsp
class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""
    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)
    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud
        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)
        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)
        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)
        return msg
async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)
def main(idea: str, investment: float = 3.0, n_round: int = 10):
    """
    :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"
                 or "Trump: Climate change is a hoax"
    :param investment: contribute a certain dollar amount to watch the debate
    :param n_round: maximum rounds of the debate
    :return:
    """
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(debate(idea, investment, n_round))
if __name__ == "__main__":
    fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))

运行结果(交替发言):

4. 总结

这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to 参数的使用。

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。


站内文章一览

相关文章
|
9天前
|
机器学习/深度学习 人工智能 算法
Agent AI智能体:塑造未来社会的智慧力量
Agent AI智能体:塑造未来社会的智慧力量
31 0
|
6天前
|
人工智能 自然语言处理 监控
AI大模型智能体工作流涉及使用Ollama和FastGPT这两个工具
AI大模型智能体工作流涉及使用Ollama和FastGPT这两个工具
43 4
|
9天前
|
人工智能 自然语言处理 数据安全/隐私保护
扣子(Coze)搭建一个AI智能体
扣子(Coze)搭建一个AI智能体
70 2
|
13天前
|
人工智能 自然语言处理 API
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
|
21天前
|
机器学习/深度学习 人工智能 自然语言处理
智能化未来:Agent AI智能体的崛起与全球挑战
智能化未来:Agent AI智能体的崛起与全球挑战
54 1
|
22天前
|
人工智能 监控 前端开发
基于ReAct机制的AI Agent
当前,在各个大厂纷纷卷LLM的情况下,各自都借助自己的LLM推出了自己的AI Agent,比如字节的Coze,百度的千帆等,还有开源的Dify。你是否想知道其中的原理?是否想过自己如何实现一套AI Agent?当然,借助LangChain就可以。
|
29天前
|
机器学习/深度学习 人工智能 分布式计算
Agent AI智能体:如何借助机器学习引领科技新潮流
Agent AI智能体:如何借助机器学习引领科技新潮流
|
9天前
|
人工智能 自然语言处理 监控
搭建本地私有AI大模型智能体
搭建本地私有AI大模型智能体
55 0
|
28天前
|
机器学习/深度学习 人工智能 分布式计算
Agent AI智能体:如何借助机器学习引领科技新潮流
Agent AI智能体:如何借助机器学习引领科技新潮流
55 0
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
Agent AI智能体的未来角色、发展路径及其面临的挑战
Agent AI智能体的未来角色、发展路径及其面临的挑战