ModelScope 联合呜哩WULI团队重磅开源 Flowra —— FlowBench 背后的核心引擎!
Flowra 是 FlowBench 的核心图执行引擎和节点包开发工具。它为开发者提供了一套完整的工具链,用于创建、测试、调试和发布自定义节点包。借助 Flowra,您可以轻松封装机器学习模型、图像处理算法。
开源地址:
Github:
https://github.com/modelscope/flowra
FlowBench客户端下载地址:
https://modelscope.cn/flowbench/download
Flowra是什么?
一句话概括:魔搭 FlowBench 背后的核心图执行引擎 + 一站式节点包开发工具链。
传统 AI 工作流开发常常面临这些痛点:
| 痛点 | 具体表现 |
| 模型集成繁琐 | 依赖管理混乱,环境配置耗时费力 |
| 多模态数据难统一 | 图像、音频、视频、3D 模型各自为战,缺乏统一处理范式 |
| 调试效率低下 | 缺少可视化工具,问题定位全靠"盲猜" |
| 分布式扩展困难 | 高性能计算门槛高,难以规模化部署 |
Flowra 正是为解决这些问题而生。
核心亮点:从类型系统到分布式执行
1. 完善的类型系统
Flowra 内置了丰富的 组件类型(ComponentType) 和 值类型(ValueType):
- 支持图像(Image)、视频(Video)、音频(Audio)、3D网格(Mesh)等多种多媒体数据类型
- 提供数字输入(Number)、选择器(Select)、占位符(Placeholder)等前端交互组件
- 类型安全,运行时自动验证
这意味着开发者可以在统一的框架下处理各种复杂数据,无需为不同数据类型编写不同的处理逻辑。
class MyNode(Node): class InputArgs(BaseArgs): image: Placeholder(value_type=AnyImage, description="输入图像") threshold: Number(value_type=float, description="阈值", default=0.5) class OutputArgs(BaseArgs): result: Placeholder(value_type=AnyImage, description="处理结果")
2. DAG执行引擎
Flowra 采用 有向无环图(DAG) 组织工作流,具备以下核心特性:
- 智能缓存:节点执行结果自动缓存,避免重复计算
- 并行调度:自动识别可并行执行的节点,最大化资源利用
- 分布式支持:天然支持分布式执行,可轻松扩展到多机环境
- 错误恢复:执行失败时可从断点恢复,无需重新运行整个流程
核心架构采用三层设计:
flowra/ ├── dag/ # DAG执行引擎 │ ├── context/ # 执行上下文 │ ├── plan/ # 逻辑计划/物理计划 │ └── graph.py # 图结构定义 ├── execute/ # 执行器和调度器 │ └── session/ # 会话管理 └── type/ # 类型系
3. 完整的开发工具链
Flowra不仅是执行引擎,更是一套完整的全流程开发工具:
| 命令 | 功能 |
| flowra create <name> | 创建新的节点包项目 |
| flowra project | 管理节点组和节点 |
| flowra build | 构建节点包为.nodebin文件 |
| flowra debug | 本地调试节点 |
| flowra config | 配置开发环境 |
从项目创建到打包发布,全流程覆盖。
4. 无缝集成ModelScope
Flowra内置了对ModelScope的支持,可以一行代码下载和管理AI模型:
- 自动下载模型文件
- 统一的模型管理接口
- 支持模型缓存和版本控制
这意味着开发者可以快速集成千余种开源模型,无需手动处理模型文件。
5. 灵活的存储后端
支持多种对象存储服务,适配不同部署场景:
- 阿里云OSS
- MinIO
- 本地文件系统
大文件、中间结果可以自动上传到云端,节省本地存储空间,同时支持分布式环境下的数据共享。
5分钟上手:从创建到部署
让我们看看如何用Flowra开发一个图像处理节点:
第一步:安装
conda create -n flowra python=3.10 conda activate flowra pip install flowr
第二步:创建项目
flowra create my_image_processor cd my_image_processo
第三步:开发节点
from flowra.dag.node import Node, BaseArgs from flowra.dag.context import RunContext from flowra.type.component_type import Placeholder, Number from flowra.type.value_type import AnyImage class ImageFilterNode(Node): class InitArgs(BaseArgs): model_path: str class InputArgs(InitArgs): image: Placeholder(value_type=AnyImage) strength: Number(value_type=float, default=1.0) class OutputArgs(BaseArgs): filtered: Placeholder(value_type=AnyImage) def init(self, ctx: RunContext, args: InitArgs): # 加载模型(只在节点初始化时执行一次) self.model = load_model(args.model_path) def run(self, ctx: RunContext, args: InputArgs) -> OutputArgs: # 执行推理 result = self.model.apply(args.image, args.strength) return self.OutputArgs(filtered=result)
第四步:构建和部署
flowra build
构建成功后,在`dist/`目录下会生成`.nodebin`文件,可以直接导入FlowBench客户端使用。
实战案例:YOLO目标检测工作流
Flowra 官方提供了完整的示例代码,其中包括一个基于 YOLO 的目标检测工作流。整个流程只需四步:
第一步:图像输入节点 —— 导入待检测的图像
第二步:YOLO 检测节点 —— 调用 ModelScope 模型进行目标检测
第三步:结果可视化节点 —— 在图像上标注检测框和类别
第四步:图像展示节点 —— 输出最终结果
整个流程通过可视化 DAG 编排,所见即所得。同时支持批量处理、参数调优等高级功能,轻松应对复杂场景。
技术细节:智能缓存与分布式调度
节点缓存机制
重复计算是工作流效率的大敌。Flowra 采用基于 SHA256 的智能缓存机制:
@property def sha256(self) -> str: return hashlib.sha256( json.dumps(self.model_dump(mode="json"), sort_keys=True).encode() ).hexdigest(
当节点的输入参数完全相同时,直接返回缓存结果,跳过重复计算。对于包含大量重复子任务的工作流,这一机制能带来大幅效率提升。
分布式调度
Flowra 的调度器(Scheduler)为大规模任务而生,具备以下能力:
多 Worker 并行执行 —— 充分利用多核/多机算力
节点级别资源隔离 —— 不同节点互不干扰,稳定可控
动态任务分配 —— 根据负载智能调度,避免资源闲置
故障自动重试 —— 单点失败不影响全局,自动恢复执行
无论是单机调试还是集群部署,Flowra 都能平滑切换,开发体验一致。
写在最后
AI 工作流的开发,不应该是少数人的专利。
Flowra 通过简洁的 API、完善的工具链和高性能的执行引擎,让每一位开发者都能快速构建自己的 AI 应用。
无论你是想:
快速验证一个 AI 想法 —— Flowra 帮你从零到一,几行代码跑通原型
构建复杂的多模态处理管道 —— 图像、视频、音频、3D,统一框架轻松搞定
部署生产级的 AI 服务 —— 分布式调度、智能缓存、断点恢复,稳定可靠
Flowra 都能提供强有力的支持。
现在就开始,用 Flowra 构建你的第一个 AI 工作流吧!