【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支

简介: 【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支
  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:


上篇文章(【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互)中我们简单使用了一下AgentScope的Pipeline模块,方便地实现了一个多智能体交互应用。今天我们来深入Pipeline的源码,来看下AgentScope都提供了哪些类型的Pipeline,以及它的实现原理是什么。

0. Pipeline概念简介

我的个人简单理解:AgentScope为了方便大家对智能体间交互逻辑的编排,特地封装了 Pipeline 模块,其中包含了一系列地 Pipeline ,就像编程语言中的控制结构:顺序结构、条件分支、循环结构等。利用这些 Pipeline ,大家可以很方便地实现多智能体间的交互逻辑控制。

1. Pipeline的基类:PipelineBase

class PipelineBase(Operator):
    r"""Base interface of all pipelines.
    The pipeline is a special kind of operator that includes
    multiple operators and the interaction logic among them.
    """
    def __init__(self) -> None:
        self.participants: List[Any] = []
    @abstractmethod
    def __call__(self, x: Optional[dict] = None) -> dict:
        """Define the actions taken by this pipeline.
        Args:
            x (Optional[`dict`], optional):
                Dialog history and some environment information
        Returns:
            `dict`: The pipeline's response to the input.
        """

基类只是实现了一个Pipeline的基本框架,所有类型的Pipeline都继承自这个基类,然后重写自己的__call__函数。

从这个基类的说明中也可以看到AgentScope对Pipeline的定义:The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them.。Pipeline组合了一系列的operators和这些operators之间的交互逻辑。所谓的operators,其实就是我们所认识的Agent。

2. SequentialPipeline

这个Pipeline用来实现类似编程语言中的顺序执行结构。上篇文章中已经写过其源码了,所以在这里不再展开,感兴趣的可以去看一下:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互

3. IfElsePipeline

这个Pipeline用来实现类似编程语言中的 if-else 分支结构。

3.1 初始化参数

class IfElsePipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], bool],
        if_body_operators: Operators,
        else_body_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.if_body_operator = if_body_operators
        self.else_body_operator = else_body_operators
        self.participants = [self.if_body_operator] + [self.else_body_operator]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return ifelsepipeline(
            condition_func=self.condition_func,
            if_body_operators=self.if_body_operator,
            else_body_operators=self.else_body_operator,
            x=x,
        )

其初始化接收三个参数,比较好理解:

  • condition_func:判断条件
  • if_body_operators:满足判断条件时执行的Agent
  • else_body_operators:不满足判断条件时执行的Agent

3.2 实现原理

重写__call__函数,调用了 ifelsepipeline 函数:

def ifelsepipeline(
    condition_func: Callable,
    if_body_operators: Operators,
    else_body_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    if condition_func(x):
        return _operators(if_body_operators, x)
    else:
        return _operators(else_body_operators, x)

里面 if condition_func(x) 来判断是否满足设置的判断条件,然后选择执行 if_body_operators 还是 else_body_operators

这里的 if_body_operators 或者 else_body_operators 可以是一系列的 operators,通过 _operators函数来进行判断和执行:

def _operators(operators: Operators, x: Optional[dict] = None) -> dict:
    """Syntactic sugar for executing a single operator or a sequence of
    operators."""
    if isinstance(operators, Sequence):
        return sequentialpipeline(operators, x)
    else:
        return operators(x)

从这个函数的实现可以看到,如果operators是一系列operator,则会对它们执行 sequentialpipeline 顺序结构

4. SwitchPipeline

这个Pipeline用来实现类似编程语言中的 switch-case 分支结构。

4.1 初始化参数

class SwitchPipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], Any],
        case_operators: Mapping[Any, Operators],
        default_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.case_operators = case_operators
        self.default_operators = default_operators
        self.participants = list(self.case_operators.values()) + [
            self.default_operators,
        ]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return switchpipeline(
            condition_func=self.condition_func,
            case_operators=self.case_operators,
            default_operators=self.default_operators,
            x=x,
        )

该Pipeline的初始化接收三个参数:

  • condition_func:判断条件
  • case_operators:满足case条件时执行的Agent,注意,这里的case_operators是个Mapping映射列表,key为case条件,value为该case下需要执行的operators
  • default_operators:不满足任何一个case条件时执行的Agent

4.2 实现原理

重写__call__函数,调用了 switchpipeline 函数:

def switchpipeline(
    condition_func: Callable[[Any], Any],
    case_operators: Mapping[Any, Operators],
    default_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    target_case = condition_func(x)
    if target_case in case_operators:
        return _operators(case_operators[target_case], x)
    else:
        return _operators(default_operators, x)

首先是 target_case = condition_func(x),根据判断条件找出当前的case条件,然后根据case条件找出需要执行的operators(case_operators[target_case]),通过 _operators 函数来进行顺序执行。

5. 总结

今天这篇文章我们主要通过阅读源码,学习了AgentScope中Pipeline模块的基类、顺序Pipeline和条件Pipeline的实现。所谓的顺序Pipeline就是将Agent按顺序执行,消息按顺序传递。条件Pipeline就是用户给出判定条件,以及每种条件下应该运行的Agents,然后在满足某种条件的时候顺序执行该条件下的Agents。

如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~


  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:

相关文章
|
26天前
|
人工智能 开发框架 决策智能
谷歌开源多智能体开发框架 Agent Development Kit:百行代码构建复杂AI代理,覆盖整个开发周期!
谷歌开源的Agent Development Kit(ADK)是首个代码优先的Python工具包,通过多智能体架构和灵活编排系统,支持开发者在百行代码内构建复杂AI代理,提供预置工具库与动态工作流定义能力。
180 3
谷歌开源多智能体开发框架 Agent Development Kit:百行代码构建复杂AI代理,覆盖整个开发周期!
|
20天前
|
人工智能 JavaScript 前端开发
领导给我3天时间汇总所有AI模块词条,结合DeepSeek,20分钟就搞定了。
本文分享了一次利用AI工具提升工作效率的实际案例。作者接到任务,需在3天内梳理公司AI模块的所有词条并以增量形式提供给项目组。为高效完成任务,作者借助DeepSeek编写了三个Node.js脚本:第一个脚本扫描所有/ai目录下的文件,提取符合“zxy.xxx”格式的词条;第二个脚本对比目标词条库与已提取的词条,生成过滤后的副本;第三个脚本将最终结果输出为Excel文档,满足领导需求。整个过程从十几分钟到二十分钟不等,大幅缩短了原本需要数天的工作量。此案例表明,在重复性工作中合理运用AI工具可显著提高效率。
177 12
|
13天前
|
机器学习/深度学习 人工智能 JSON
这个AI把arXiv变成代码工厂,快速复现顶会算法!Paper2Code:AI论文自动转代码神器,多智能体框架颠覆科研复现
Paper2Code是由韩国科学技术院与DeepAuto.ai联合开发的多智能体框架,通过规划、分析和代码生成三阶段流程,将机器学习论文自动转化为可执行代码仓库,显著提升科研复现效率。
127 18
这个AI把arXiv变成代码工厂,快速复现顶会算法!Paper2Code:AI论文自动转代码神器,多智能体框架颠覆科研复现
|
2月前
|
人工智能 网络协议 Java
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
RuoYi AI 是一个全栈式 AI 开发平台,支持本地 RAG 方案,集成多种大语言模型和多媒体功能,适合企业和个人开发者快速搭建个性化 AI 应用。
1012 21
RuoYi AI:1人搞定AI中台!开源全栈式AI开发平台,快速集成大模型+RAG+支付等模块
|
2月前
|
存储 人工智能 监控
Mahilo:多智能体实时协作框架开源!人类与AI无缝交互,复杂任务一键协同
Mahilo 是一个灵活的多智能体框架,支持创建与人类互动的多智能体系统,适用于从客户服务到紧急响应等多种场景。
160 2
Mahilo:多智能体实时协作框架开源!人类与AI无缝交互,复杂任务一键协同
|
4月前
|
人工智能 自然语言处理 语音技术
FilmAgent:多智能体共同协作制作电影,哈工大联合清华推出 AI 驱动的自动化电影制作工具
FilmAgent 是由哈工大与清华联合推出的AI电影自动化制作工具,通过多智能体协作实现从剧本生成到虚拟拍摄的全流程自动化。
1373 11
FilmAgent:多智能体共同协作制作电影,哈工大联合清华推出 AI 驱动的自动化电影制作工具
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
Agent Laboratory:AI自动撰写论文,AMD开源自动完成科研全流程的多智能体框架
Agent Laboratory 是由 AMD 和约翰·霍普金斯大学联合推出的自主科研框架,基于大型语言模型,能够加速科学发现、降低成本并提高研究质量。
448 23
Agent Laboratory:AI自动撰写论文,AMD开源自动完成科研全流程的多智能体框架
|
2月前
|
存储 人工智能 运维
少年云亮相联合国教科文组织,已向偏远地区捐赠200多所AI云教室
少年云亮相联合国教科文组织,已向偏远地区捐赠200多所AI云教室
|
5月前
|
人工智能 开发框架 算法
Qwen-Agent:阿里通义开源 AI Agent 应用开发框架,支持构建多智能体,具备自动记忆上下文等能力
Qwen-Agent 是阿里通义开源的一个基于 Qwen 模型的 Agent 应用开发框架,支持指令遵循、工具使用、规划和记忆能力,适用于构建复杂的智能代理应用。
1472 10
Qwen-Agent:阿里通义开源 AI Agent 应用开发框架,支持构建多智能体,具备自动记忆上下文等能力
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
Gemini 2.0:谷歌推出的原生多模态输入输出 + Agent 为核心的 AI 模型
谷歌最新推出的Gemini 2.0是一款原生多模态输入输出的AI模型,以Agent技术为核心,支持多种数据类型的输入与输出,具备强大的性能和多语言音频输出能力。本文将详细介绍Gemini 2.0的主要功能、技术原理及其在多个领域的应用场景。
671 20
Gemini 2.0:谷歌推出的原生多模态输入输出 + Agent 为核心的 AI 模型