【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支

简介: 【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支
  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:


上篇文章(【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互)中我们简单使用了一下AgentScope的Pipeline模块,方便地实现了一个多智能体交互应用。今天我们来深入Pipeline的源码,来看下AgentScope都提供了哪些类型的Pipeline,以及它的实现原理是什么。

0. Pipeline概念简介

我的个人简单理解:AgentScope为了方便大家对智能体间交互逻辑的编排,特地封装了 Pipeline 模块,其中包含了一系列地 Pipeline ,就像编程语言中的控制结构:顺序结构、条件分支、循环结构等。利用这些 Pipeline ,大家可以很方便地实现多智能体间的交互逻辑控制。

1. Pipeline的基类:PipelineBase

class PipelineBase(Operator):
    r"""Base interface of all pipelines.
    The pipeline is a special kind of operator that includes
    multiple operators and the interaction logic among them.
    """
    def __init__(self) -> None:
        self.participants: List[Any] = []
    @abstractmethod
    def __call__(self, x: Optional[dict] = None) -> dict:
        """Define the actions taken by this pipeline.
        Args:
            x (Optional[`dict`], optional):
                Dialog history and some environment information
        Returns:
            `dict`: The pipeline's response to the input.
        """

基类只是实现了一个Pipeline的基本框架,所有类型的Pipeline都继承自这个基类,然后重写自己的__call__函数。

从这个基类的说明中也可以看到AgentScope对Pipeline的定义:The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them.。Pipeline组合了一系列的operators和这些operators之间的交互逻辑。所谓的operators,其实就是我们所认识的Agent。

2. SequentialPipeline

这个Pipeline用来实现类似编程语言中的顺序执行结构。上篇文章中已经写过其源码了,所以在这里不再展开,感兴趣的可以去看一下:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互

3. IfElsePipeline

这个Pipeline用来实现类似编程语言中的 if-else 分支结构。

3.1 初始化参数

class IfElsePipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], bool],
        if_body_operators: Operators,
        else_body_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.if_body_operator = if_body_operators
        self.else_body_operator = else_body_operators
        self.participants = [self.if_body_operator] + [self.else_body_operator]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return ifelsepipeline(
            condition_func=self.condition_func,
            if_body_operators=self.if_body_operator,
            else_body_operators=self.else_body_operator,
            x=x,
        )

其初始化接收三个参数,比较好理解:

  • condition_func:判断条件
  • if_body_operators:满足判断条件时执行的Agent
  • else_body_operators:不满足判断条件时执行的Agent

3.2 实现原理

重写__call__函数,调用了 ifelsepipeline 函数:

def ifelsepipeline(
    condition_func: Callable,
    if_body_operators: Operators,
    else_body_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    if condition_func(x):
        return _operators(if_body_operators, x)
    else:
        return _operators(else_body_operators, x)

里面 if condition_func(x) 来判断是否满足设置的判断条件,然后选择执行 if_body_operators 还是 else_body_operators

这里的 if_body_operators 或者 else_body_operators 可以是一系列的 operators,通过 _operators函数来进行判断和执行:

def _operators(operators: Operators, x: Optional[dict] = None) -> dict:
    """Syntactic sugar for executing a single operator or a sequence of
    operators."""
    if isinstance(operators, Sequence):
        return sequentialpipeline(operators, x)
    else:
        return operators(x)

从这个函数的实现可以看到,如果operators是一系列operator,则会对它们执行 sequentialpipeline 顺序结构

4. SwitchPipeline

这个Pipeline用来实现类似编程语言中的 switch-case 分支结构。

4.1 初始化参数

class SwitchPipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], Any],
        case_operators: Mapping[Any, Operators],
        default_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.case_operators = case_operators
        self.default_operators = default_operators
        self.participants = list(self.case_operators.values()) + [
            self.default_operators,
        ]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return switchpipeline(
            condition_func=self.condition_func,
            case_operators=self.case_operators,
            default_operators=self.default_operators,
            x=x,
        )

该Pipeline的初始化接收三个参数:

  • condition_func:判断条件
  • case_operators:满足case条件时执行的Agent,注意,这里的case_operators是个Mapping映射列表,key为case条件,value为该case下需要执行的operators
  • default_operators:不满足任何一个case条件时执行的Agent

4.2 实现原理

重写__call__函数,调用了 switchpipeline 函数:

def switchpipeline(
    condition_func: Callable[[Any], Any],
    case_operators: Mapping[Any, Operators],
    default_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    target_case = condition_func(x)
    if target_case in case_operators:
        return _operators(case_operators[target_case], x)
    else:
        return _operators(default_operators, x)

首先是 target_case = condition_func(x),根据判断条件找出当前的case条件,然后根据case条件找出需要执行的operators(case_operators[target_case]),通过 _operators 函数来进行顺序执行。

5. 总结

今天这篇文章我们主要通过阅读源码,学习了AgentScope中Pipeline模块的基类、顺序Pipeline和条件Pipeline的实现。所谓的顺序Pipeline就是将Agent按顺序执行,消息按顺序传递。条件Pipeline就是用户给出判定条件,以及每种条件下应该运行的Agents,然后在满足某种条件的时候顺序执行该条件下的Agents。

如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~


  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:

相关文章
|
5天前
|
人工智能 光互联 数据中心
800G光模块面对AI发展的增长之路
光模块市场因AI驱动的算力需求增长而加速发展,800G产品需求强劲,预计未来几年市场规模将以每年约11%的复合年增长率扩张。尽管面临价格竞争和原材料供应紧张的挑战,800G光模块将于2024年迎来大规模出货,而2025年将启动1.6T光模块商用周期。随着技术迭代加快和高端产品放量,行业头部企业将受益,光通信行业将迎来黄金时期。
9 0
800G光模块面对AI发展的增长之路
|
7天前
|
人工智能
[AI 阿里 EMO] 集成进通义千问app,全民演唱,人人可用!
阿里EMO技术的最新进展,现在集成进通义千问app,允许用户通过图+音频生成任意时长的视频,实现全民演唱的互动体验。
[AI 阿里 EMO] 集成进通义千问app,全民演唱,人人可用!
|
7天前
|
人工智能 移动开发 小程序
uniapp框架——vue3+uniFilePicker+fastapi实现文件上传(搭建ai项目第二步)
uniapp框架——vue3+uniFilePicker+fastapi实现文件上传(搭建ai项目第二步)
34 2
|
7天前
|
人工智能 小程序 前端开发
uniapp框架——初始化vue3项目(搭建ai项目第一步)
uniapp框架——初始化vue3项目(搭建ai项目第一步)
23 1
|
8天前
|
人工智能 Ubuntu 机器人
AI电销机器人系统源码部署之:freeswitch安装Linux
在Linux服务器上安装FreeSWITCH的简要步骤:更新软件包,安装依赖(如build-essential,libssl-dev等),下载v1.10.7源代码,解压并配置,编译,然后运行`./bootstrap.sh -j`,`./configure`,`make`,`make install`。启动FreeSWITCH服务,配置SIP用户和路由,测试连接与通话,并确保防火墙打开SIP(5060)和RTP端口。注意,实际部署可能需按需求调整。
|
8天前
|
存储 人工智能 测试技术
【AI智能体】SuperAGI-开源AI Agent 管理平台
【4月更文挑战第9天】智能体管理平台SuperAGI简介及实践
|
10天前
|
人工智能 API 决策智能
【AI Agent系列】【阿里AgentScope框架】实战1:利用AgentScope实现动态创建Agent和自由组织讨论
【AI Agent系列】【阿里AgentScope框架】实战1:利用AgentScope实现动态创建Agent和自由组织讨论
52 2
|
10天前
|
人工智能 决策智能 C++
【AI Agent系列】【阿里AgentScope框架】5. Pipeline模块的组合使用及Pipeline模块总结
【AI Agent系列】【阿里AgentScope框架】5. Pipeline模块的组合使用及Pipeline模块总结
31 1
|
10天前
|
人工智能 决策智能
【AI Agent系列】【阿里AgentScope框架】4. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 循环结构
【AI Agent系列】【阿里AgentScope框架】4. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 循环结构
21 0
|
10天前
|
人工智能 决策智能 C++
【AI Agent教程】【MetaGPT】案例拆解:使用MetaGPT实现“狼人杀“游戏(1)- 整体框架解析
【AI Agent教程】【MetaGPT】案例拆解:使用MetaGPT实现“狼人杀“游戏(1)- 整体框架解析
104 1