【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支

简介: 【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支
  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:


上篇文章(【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互)中我们简单使用了一下AgentScope的Pipeline模块,方便地实现了一个多智能体交互应用。今天我们来深入Pipeline的源码,来看下AgentScope都提供了哪些类型的Pipeline,以及它的实现原理是什么。

0. Pipeline概念简介

我的个人简单理解:AgentScope为了方便大家对智能体间交互逻辑的编排,特地封装了 Pipeline 模块,其中包含了一系列地 Pipeline ,就像编程语言中的控制结构:顺序结构、条件分支、循环结构等。利用这些 Pipeline ,大家可以很方便地实现多智能体间的交互逻辑控制。

1. Pipeline的基类:PipelineBase

class PipelineBase(Operator):
    r"""Base interface of all pipelines.
    The pipeline is a special kind of operator that includes
    multiple operators and the interaction logic among them.
    """
    def __init__(self) -> None:
        self.participants: List[Any] = []
    @abstractmethod
    def __call__(self, x: Optional[dict] = None) -> dict:
        """Define the actions taken by this pipeline.
        Args:
            x (Optional[`dict`], optional):
                Dialog history and some environment information
        Returns:
            `dict`: The pipeline's response to the input.
        """

基类只是实现了一个Pipeline的基本框架,所有类型的Pipeline都继承自这个基类,然后重写自己的__call__函数。

从这个基类的说明中也可以看到AgentScope对Pipeline的定义:The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them.。Pipeline组合了一系列的operators和这些operators之间的交互逻辑。所谓的operators,其实就是我们所认识的Agent。

2. SequentialPipeline

这个Pipeline用来实现类似编程语言中的顺序执行结构。上篇文章中已经写过其源码了,所以在这里不再展开,感兴趣的可以去看一下:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互

3. IfElsePipeline

这个Pipeline用来实现类似编程语言中的 if-else 分支结构。

3.1 初始化参数

class IfElsePipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], bool],
        if_body_operators: Operators,
        else_body_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.if_body_operator = if_body_operators
        self.else_body_operator = else_body_operators
        self.participants = [self.if_body_operator] + [self.else_body_operator]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return ifelsepipeline(
            condition_func=self.condition_func,
            if_body_operators=self.if_body_operator,
            else_body_operators=self.else_body_operator,
            x=x,
        )

其初始化接收三个参数,比较好理解:

  • condition_func:判断条件
  • if_body_operators:满足判断条件时执行的Agent
  • else_body_operators:不满足判断条件时执行的Agent

3.2 实现原理

重写__call__函数,调用了 ifelsepipeline 函数:

def ifelsepipeline(
    condition_func: Callable,
    if_body_operators: Operators,
    else_body_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    if condition_func(x):
        return _operators(if_body_operators, x)
    else:
        return _operators(else_body_operators, x)

里面 if condition_func(x) 来判断是否满足设置的判断条件,然后选择执行 if_body_operators 还是 else_body_operators

这里的 if_body_operators 或者 else_body_operators 可以是一系列的 operators,通过 _operators函数来进行判断和执行:

def _operators(operators: Operators, x: Optional[dict] = None) -> dict:
    """Syntactic sugar for executing a single operator or a sequence of
    operators."""
    if isinstance(operators, Sequence):
        return sequentialpipeline(operators, x)
    else:
        return operators(x)

从这个函数的实现可以看到,如果operators是一系列operator,则会对它们执行 sequentialpipeline 顺序结构

4. SwitchPipeline

这个Pipeline用来实现类似编程语言中的 switch-case 分支结构。

4.1 初始化参数

class SwitchPipeline(PipelineBase):
    def __init__(
        self,
        condition_func: Callable[[dict], Any],
        case_operators: Mapping[Any, Operators],
        default_operators: Operators = placeholder,
    ) -> None:
        self.condition_func = condition_func
        self.case_operators = case_operators
        self.default_operators = default_operators
        self.participants = list(self.case_operators.values()) + [
            self.default_operators,
        ]
    def __call__(self, x: Optional[dict] = None) -> dict:
        return switchpipeline(
            condition_func=self.condition_func,
            case_operators=self.case_operators,
            default_operators=self.default_operators,
            x=x,
        )

该Pipeline的初始化接收三个参数:

  • condition_func:判断条件
  • case_operators:满足case条件时执行的Agent,注意,这里的case_operators是个Mapping映射列表,key为case条件,value为该case下需要执行的operators
  • default_operators:不满足任何一个case条件时执行的Agent

4.2 实现原理

重写__call__函数,调用了 switchpipeline 函数:

def switchpipeline(
    condition_func: Callable[[Any], Any],
    case_operators: Mapping[Any, Operators],
    default_operators: Operators = placeholder,
    x: Optional[dict] = None,
) -> dict:
    target_case = condition_func(x)
    if target_case in case_operators:
        return _operators(case_operators[target_case], x)
    else:
        return _operators(default_operators, x)

首先是 target_case = condition_func(x),根据判断条件找出当前的case条件,然后根据case条件找出需要执行的operators(case_operators[target_case]),通过 _operators 函数来进行顺序执行。

5. 总结

今天这篇文章我们主要通过阅读源码,学习了AgentScope中Pipeline模块的基类、顺序Pipeline和条件Pipeline的实现。所谓的顺序Pipeline就是将Agent按顺序执行,消息按顺序传递。条件Pipeline就是用户给出判定条件,以及每种条件下应该运行的Agents,然后在满足某种条件的时候顺序执行该条件下的Agents。

如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~


  • 大家好,我是 同学小张,日常分享AI知识和实战案例
  • 欢迎 点赞 + 关注 👏,持续学习持续干货输出
  • +v: jasper_8017 一起交流💬,一起进步💪。
  • 微信公众号也可搜【同学小张】 🙏

本站文章一览:

相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
406 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
2月前
|
人工智能 测试技术 API
构建AI智能体:二、DeepSeek的Ollama部署FastAPI封装调用
本文介绍如何通过Ollama本地部署DeepSeek大模型,结合FastAPI实现API接口调用。涵盖Ollama安装、路径迁移、模型下载运行及REST API封装全过程,助力快速构建可扩展的AI应用服务。
682 6
|
2月前
|
人工智能 运维 安全
加速智能体开发:从 Serverless 运行时到 Serverless AI 运行时
在云计算与人工智能深度融合的背景下,Serverless 技术作为云原生架构的集大成者,正加速向 AI 原生架构演进。阿里云函数计算(FC)率先提出并实践“Serverless AI 运行时”概念,通过技术创新与生态联动,为智能体(Agent)开发提供高效、安全、低成本的基础设施支持。本文从技术演进路径、核心能力及未来展望三方面解析 Serverless AI 的突破性价值。
|
2月前
|
存储 人工智能 Java
AI 超级智能体全栈项目阶段四:学术分析 AI 项目 RAG 落地指南:基于 Spring AI 的本地与阿里云知识库实践
本文介绍RAG(检索增强生成)技术,结合Spring AI与本地及云知识库实现学术分析AI应用,利用阿里云Qwen-Plus模型提升回答准确性与可信度。
1053 90
AI 超级智能体全栈项目阶段四:学术分析 AI 项目 RAG 落地指南:基于 Spring AI 的本地与阿里云知识库实践
|
2月前
|
人工智能 搜索推荐 数据可视化
当AI学会“使用工具”:智能体(Agent)如何重塑人机交互
当AI学会“使用工具”:智能体(Agent)如何重塑人机交互
350 115
|
2月前
|
人工智能 自然语言处理 安全
从工具到伙伴:AI代理(Agent)是下一场革命
从工具到伙伴:AI代理(Agent)是下一场革命
284 117
|
2月前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
3641 48
|
2月前
|
人工智能 缓存 运维
【智造】AI应用实战:6个agent搞定复杂指令和工具膨胀
本文介绍联调造数场景下的AI应用演进:从单Agent模式到多Agent协同的架构升级。针对复杂指令执行不准、响应慢等问题,通过意图识别、工具引擎、推理执行等多Agent分工协作,结合工程化手段提升准确性与效率,并分享了关键设计思路与实践心得。
479 20
【智造】AI应用实战:6个agent搞定复杂指令和工具膨胀
|
2月前
|
人工智能 API 开发工具
构建AI智能体:一、初识AI大模型与API调用
本文介绍大模型基础知识及API调用方法,涵盖阿里云百炼平台密钥申请、DashScope SDK使用、Python调用示例(如文本情感分析、图像文字识别),助力开发者快速上手大模型应用开发。
1197 16
构建AI智能体:一、初识AI大模型与API调用
|
2月前
|
存储 机器学习/深度学习 人工智能
构建AI智能体:三、Prompt提示词工程:几句话让AI秒懂你心
本文深入浅出地讲解Prompt原理及其与大模型的关系,系统介绍Prompt的核心要素、编写原则与应用场景,帮助用户通过精准指令提升AI交互效率,释放大模型潜能。
507 5