- 大家好,我是 同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览:
上篇文章(【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互)中我们简单使用了一下AgentScope的Pipeline模块,方便地实现了一个多智能体交互应用。今天我们来深入Pipeline的源码,来看下AgentScope都提供了哪些类型的Pipeline,以及它的实现原理是什么。
0. Pipeline概念简介
我的个人简单理解:AgentScope为了方便大家对智能体间交互逻辑的编排,特地封装了 Pipeline 模块,其中包含了一系列地 Pipeline ,就像编程语言中的控制结构:顺序结构、条件分支、循环结构等。利用这些 Pipeline ,大家可以很方便地实现多智能体间的交互逻辑控制。
1. Pipeline的基类:PipelineBase
class PipelineBase(Operator): r"""Base interface of all pipelines. The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them. """ def __init__(self) -> None: self.participants: List[Any] = [] @abstractmethod def __call__(self, x: Optional[dict] = None) -> dict: """Define the actions taken by this pipeline. Args: x (Optional[`dict`], optional): Dialog history and some environment information Returns: `dict`: The pipeline's response to the input. """
基类只是实现了一个Pipeline的基本框架,所有类型的Pipeline都继承自这个基类,然后重写自己的__call__
函数。
从这个基类的说明中也可以看到AgentScope对Pipeline的定义:The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them.
。Pipeline组合了一系列的operators和这些operators之间的交互逻辑。所谓的operators,其实就是我们所认识的Agent。
2. SequentialPipeline
这个Pipeline用来实现类似编程语言中的顺序执行结构。上篇文章中已经写过其源码了,所以在这里不再展开,感兴趣的可以去看一下:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互
3. IfElsePipeline
这个Pipeline用来实现类似编程语言中的 if-else
分支结构。
3.1 初始化参数
class IfElsePipeline(PipelineBase): def __init__( self, condition_func: Callable[[dict], bool], if_body_operators: Operators, else_body_operators: Operators = placeholder, ) -> None: self.condition_func = condition_func self.if_body_operator = if_body_operators self.else_body_operator = else_body_operators self.participants = [self.if_body_operator] + [self.else_body_operator] def __call__(self, x: Optional[dict] = None) -> dict: return ifelsepipeline( condition_func=self.condition_func, if_body_operators=self.if_body_operator, else_body_operators=self.else_body_operator, x=x, )
其初始化接收三个参数,比较好理解:
- condition_func:判断条件
- if_body_operators:满足判断条件时执行的Agent
- else_body_operators:不满足判断条件时执行的Agent
3.2 实现原理
重写__call__
函数,调用了 ifelsepipeline
函数:
def ifelsepipeline( condition_func: Callable, if_body_operators: Operators, else_body_operators: Operators = placeholder, x: Optional[dict] = None, ) -> dict: if condition_func(x): return _operators(if_body_operators, x) else: return _operators(else_body_operators, x)
里面 if condition_func(x)
来判断是否满足设置的判断条件,然后选择执行 if_body_operators
还是 else_body_operators
。
这里的 if_body_operators
或者 else_body_operators
可以是一系列的 operators
,通过 _operators
函数来进行判断和执行:
def _operators(operators: Operators, x: Optional[dict] = None) -> dict: """Syntactic sugar for executing a single operator or a sequence of operators.""" if isinstance(operators, Sequence): return sequentialpipeline(operators, x) else: return operators(x)
从这个函数的实现可以看到,如果operators
是一系列operator
,则会对它们执行 sequentialpipeline
顺序结构。
4. SwitchPipeline
这个Pipeline用来实现类似编程语言中的 switch-case
分支结构。
4.1 初始化参数
class SwitchPipeline(PipelineBase): def __init__( self, condition_func: Callable[[dict], Any], case_operators: Mapping[Any, Operators], default_operators: Operators = placeholder, ) -> None: self.condition_func = condition_func self.case_operators = case_operators self.default_operators = default_operators self.participants = list(self.case_operators.values()) + [ self.default_operators, ] def __call__(self, x: Optional[dict] = None) -> dict: return switchpipeline( condition_func=self.condition_func, case_operators=self.case_operators, default_operators=self.default_operators, x=x, )
该Pipeline的初始化接收三个参数:
- condition_func:判断条件
- case_operators:满足case条件时执行的Agent,注意,这里的case_operators是个Mapping映射列表,key为case条件,value为该case下需要执行的operators。
- default_operators:不满足任何一个case条件时执行的Agent
4.2 实现原理
重写__call__
函数,调用了 switchpipeline
函数:
def switchpipeline( condition_func: Callable[[Any], Any], case_operators: Mapping[Any, Operators], default_operators: Operators = placeholder, x: Optional[dict] = None, ) -> dict: target_case = condition_func(x) if target_case in case_operators: return _operators(case_operators[target_case], x) else: return _operators(default_operators, x)
首先是 target_case = condition_func(x)
,根据判断条件找出当前的case条件,然后根据case条件找出需要执行的operators(case_operators[target_case]
),通过 _operators
函数来进行顺序执行。
5. 总结
今天这篇文章我们主要通过阅读源码,学习了AgentScope中Pipeline模块的基类、顺序Pipeline和条件Pipeline的实现。所谓的顺序Pipeline就是将Agent按顺序执行,消息按顺序传递。条件Pipeline就是用户给出判定条件,以及每种条件下应该运行的Agents,然后在满足某种条件的时候顺序执行该条件下的Agents。
如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~
- 大家好,我是 同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览: