Taskflow 有兴趣了解一下?

简介: Taskflow 有兴趣了解一下?

最近在工作中会经常使用到Taskflow这个东西,看起来虽然不是很难,但是遇到各种重写的时候看起来还是有点烦的,这时候就必须来了解一下taskflow这样一个东西了。

声明:

请原谅我自己手工作的图!



TaskFlow是OpenStack开源的Python库,它帮助使任务执行变得简单、一致、可伸缩和可靠。它允许创建轻量级任务对象或函数,这些对象或函数以声明的方式组合到Flow中。它包括以一种可以停止、恢复和安全地恢复的方式运行这些Flow的引擎。使用这个库实现的项目可以享受额外的状态弹性、自然的声明式构造、更容易测试(因为任务只做一件事)、工作流可插拔性、容错和简化的崩溃恢复/容错(以及更多)。



简而言之就是:TaskFlow支持创建不同的 task,并以声明的方式集成到一个 flow 中,这些 flow 会通过 engine 执行、停止、继续和恢复。


Taskflow,先有task,再有flow,那我们先来看看task吧!

Task很简单,就是把你想执行的任务放到一个类(这个类要继承自task类)里面的execute方法里就可以了。

具体来说就是:


from taskflow import task
class TaskA(task.Task):
    def execute(self,str,*args,**kwargs):
        print('Hello,I am {0}'.format(str))


Task说完了,创建的Task可以是任何不同的Task,他们既可以是顺序的,也可以是毫无关联的,还可以是微相关的。为什么可以这么做呢?

因为在Flow里,一共提供三种Flow的执行方式来解决task关联性的问题。

例如,我现在要做一件事,我需要先在A表中去查到数据a,之后再依靠a作为条件去B表中查询到b,最后再把a,b处理成结果c,那么这时候就很显然看到我们有三个task,并且这三个task是顺序执行的:

640.png

这个时候我们就需要顺序执行这三个task了,我们使用线性流(linear_flow)


from taskflow.patterns import linear_flow
linear_flow.Flow('linear').add(
    taskA(),
    taskB(),
    taskC()
)


具体实现代码就是这样了。


那如果我们想做的三个task A,B,C没有依赖关系呢?那我们的三个任务是不是就完全可以并行执行?

640.png

回答:当然是!

taskflow里面又为我们提供了一种流叫无序流(unordered_flow)

它可以让这些task并行执行,就像python里面的多线程一样,谁先抢到资源谁就先执行,等到三个都执行完毕了,这个流就结束了。


from taskflow.patterns import unordered_flow
unordered_flow.Flow('linear').add(
    taskA(),
    taskB(),
    taskC()
)


最后还有一种流是图流(graph_flow),官方是这么解释的:

所包含的流/任务将根据它们的依赖关系执行,这些依赖关系将通过使用流/任务提供的和需要的映射来解决,或者通过遵循手动创建的依赖关系链接来解决。


根据依赖关系构建有向图。如果它有边A -> B,这意味着B依赖于A(并且B的执行必须等到A完成执行,而恢复意味着A的恢复必须等到B完成恢复)。

https://docs.openstack.org/taskflow/latest/user/patterns.html#module-taskflow.patterns.graph_flow


这个意思和之前的顺序是不一样的,就是说,比如我有taskA和taskB,在执行A的时候我里面可以会用到一些和B挂钩的事情,执行B的时候也同样,Task A,B之间存在依赖关系,那我们这个时候就可以使用图流了:

640.png


实现:


from taskflow.patterns import graph_flow
graph_flow.Flow('linear').add(
    taskA(),
    taskB()
)



以上就是三种流的执行方式,那么如果在flow的执行过程中,某一条task执行出现错误了,那会怎么办呢?


这个时候就task里面的revert方法就体现出了很厉害的作用了。

这里的机制其实和SQL事务里面的机制是类似的,当我在flow里面的某条task执行有问题的时候,整个flow也出现了问题。这个时候我就全部回滚,让这些task都回到一开始的初始状态。


可以看一下官方文档的例子:


Conceptual example


This pseudocode illustrates what how a flow would work for those who are familiar with SQL transactions.


START TRANSACTION
   task1: call nova API to launch a server || ROLLBACK
   task2: when task1 finished, call cinder API to attach block storage to the server || ROLLBACK
   ...perform other tasks...
COMMIT

The above flow could be used by Heat (for example) as part of an orchestration to add a server with block storage attached. It may launch several of these in parallel to prepare a number of identical servers (or do other work depending on the desired request).



对了,还有个东西忘了讲了,我们有了task,构造了flow,那我们怎么去执行呢?这个时候就需要用到engine方法了,作为启动flow的引擎是绝对不能丢的!


engine里面自带一个run方法,run方法需要穿两个参数,第一个参数是flow,第二个参数就是flow里面的task所需要的参数,默认是以字典的形式传入的。


我们可以来看一个简单的例子:


import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
class TaskA(task.Task):
    def execute(self,str1,*args,**kwargs):
        print('Hello,I am {0}'.format(str1))
class TaskB(task.Task):
    def execute(self,str2,*args,**kwargs):
        print('Hello,I am {0} years old'.format(str2))
if __name__ == "__main__":
    flow = lf.Flow('simple-linear').add(TaskA(), TaskB())
    taskflow.engines.run(flow,store = dict(str1='john', str2="23"))


640.png


最后附上一张官网的工作图:

640.jpg


over!

相关文章
|
23天前
|
人工智能 Ubuntu 数据可视化
【详细教程】如何在Ubuntu上本地部署Dify?
Dify 是一个开源的大语言模型应用开发平台,支持低代码/无代码开发,提供多模型接入、Agent框架、RAG检索增强生成等功能,助力快速构建AI应用。支持本地部署,提供详尽文档与可视化界面,适用于聊天助手、文本生成、自动化任务等多种场景。
488 124
|
数据可视化 关系型数据库 MySQL
【MySQL】MySQL8.0 创建用户及授权 - 看这篇就足够了
本文介绍了在MySQL 8.0+版本中创建和管理用户的详细步骤,包括通过命令行进入MySQL、创建数据库、用户及授权等操作,并提供了具体命令示例。适合初学者参考学习,帮助实现系统的权限管理和安全控制。
6413 3
【MySQL】MySQL8.0 创建用户及授权 - 看这篇就足够了
|
SQL 存储 开发框架
数据库必知词汇:用户定义函数(UDF)
用户定义函数(UDF) 由一个或多个SQL语句组成的子程序,可用于封装代码以便重新使用。通常情况下不将用户限制在定义为SQL语言一部分的内置函数上,而是允许用户创建自己的用户定义函数。
2593 0
|
2月前
|
人工智能 自然语言处理 安全
Milvus x n8n :自动化拆解Github文档,零代码构建领域知识智能问答
本文介绍了在构建特定技术领域问答机器人时面临的四大挑战:知识滞后性、信息幻觉、领域术语理解不足和知识库维护成本高。通过结合Milvus向量数据库和n8n低代码平台,提出了一种高效的解决方案。该方案利用Milvus的高性能向量检索和n8n的工作流编排能力,构建了一个可自动更新、精准回答技术问题的智能问答系统,并介绍了部署过程中的可观测性和安全性实现方法。
|
10月前
|
人工智能 小程序 数据处理
uni-app开发AI康复锻炼小程序,帮助肢体受伤患者康复!
近期,多家康复机构咨询AI运动识别插件是否适用于肢力运动受限患者的康复锻炼。本文介绍该插件在康复锻炼中的应用场景,包括康复运动指导、运动记录、恢复程度记录及过程监测。插件集成了人体检测、姿态识别等功能,支持微信小程序平台,使用便捷,安全可靠,帮助康复治疗更加高效精准。
|
12月前
|
人工智能 算法 C语言
详解树状数组(C/C++)
详解树状数组(C/C++)
|
机器学习/深度学习 自然语言处理 算法框架/工具
"揭秘高性能开源模型服务之谜:SGLang Runtime如何助力智能问答飞越性能瓶颈?"
【8月更文挑战第20天】随着AI技术的发展,开源模型成为行业创新的关键。本文通过一个智能问答系统的案例,展示了SGLang Runtime在优化模型服务性能方面的优势。SGLang Runtime是一款高性能的开源框架,支持多种深度学习框架,具备异构计算能力、简洁API及可扩展性。通过模型转换、部署和服务调用等步骤,并结合性能优化措施如调整批处理大小、模型剪枝和量化,显著提升了服务质量。此案例为开发者提供了实用指南,助力AI技术的有效应用。
489 1
|
12月前
|
数据采集 前端开发 Python
Python pygame 实现游戏 彩色 五子棋 详细注释 附源码 单机版
Python pygame 实现游戏 彩色 五子棋 详细注释 附源码 单机版
235 0
|
监控 Linux 编译器
Linux C++ 定时器任务接口深度解析: 从理论到实践
Linux C++ 定时器任务接口深度解析: 从理论到实践
388 2
|
弹性计算 监控 网络协议
分析网络超时问题的最佳实践
对于云上的用户来说,业务日志里面报超时问题处理起来往往比价棘手,因为1) 问题点可能在云基础设施层,也有可能在业务软件层,需要排查的范围非常广;2) 这类问题往往是不可复现问题,抓到现场比较难。在本文里就分析下如何来分辨和排查这类问题的根本原因。
分析网络超时问题的最佳实践