使用Airflow在k8s集群上轻松搭建企业级工作流

简介: Apache Airflow 是一个开源工作流管理平台,支持编写、调度与监控复杂任务流。其核心通过代码定义工作流(DAG),结合 Scheduler、Executor、Web Server 等组件实现灵活的任务管理和执行。Airflow 支持容器化部署,如通过 Helm Chart 手动部署或使用阿里云计算巢一键部署,简化运维复杂度。实际使用中,可通过 Git 仓库同步 DAG 文件至 Scheduler,支持任务依赖编排与日志跟踪。示例展示了简单的 Hello World 工作流从代码到运行的全流程,验证了其强大的图形化交互和业务扩展能力。

概述

Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控工作流(Workflows)。它最初由 Airbnb 开发,并于 2016 年捐赠给 Apache 软件基金会。Airflow 的核心理念是通过代码来定义工作流,使得工作流的管理和维护更加灵活和可扩展, github社区地址见链接

整体架构


Apache Airflow的架构主要包括以下核心组件:

  1. Scheduler(调度器):负责根据定义的DAG(Directed Acyclic Graph,有向无环图)图,计划和触发任务的执行。调度器将任务按照依赖关系组织成可执行的工作流程,并将其分发给可用的执行器。
  2. Executor(执行器):执行器负责执行调度器分发的任务。Airflow支持多种执行器,包括本地执行器(SequentialExecutor)、Celery执行器和Dask执行器等。执行器将任务实际执行在相应的工作节点上,并将执行结果返回。
  3. Web Server(Web服务器):提供Web用户界面,用于监控和管理工作流的状态、任务的执行情况、查看日志以及触发任务的手动运行等。通过Web界面,用户可以直观地了解工作流的整体情况。
  4. Database(元数据库):元数据库存储了Airflow的元数据,包括DAG的定义、任务实例的状态、任务执行日志等。这允许用户在不同的任务和工作流之间共享信息,并支持任务的重试、回溯和监控。
  5. Worker(工作节点):执行器通过工作节点在集群或计算资源上执行任务。工作节点可以是单个服务器或集群,具体取决于所选的执行器类型。

容器部署airflow方式

手动部署

手动部署可以参考Airflow官方教程,这里面介绍了怎么通过helm chart进行airflow部署,这里就不再详述了。不过由于docker.io被墙了,部署的时候需要想办法把海外的镜像拉取下来,或者直接部署到海外地域的集群中。

计算巢一键部署

计算巢提供了免费的airflow社区版服务,支持一键部署,既可以部署到已有容器集群,也支持新建容器集群,同时镜像也都使用的阿里云托管的镜像,不会存在镜像拉取不下来的问题,具体部署方式可以查看服务中的部署文档。

使用方式

Dags文件加载到scheduler调度器中

上面在k8s集群上部署好airflow以后,那么怎么运行我们定义好的DAG工作流呢,这里面主要有三种方式:

  1. 配置git仓库同步,从git仓库中加载要运行的Dags文件,这是最推荐的方式,可以很容易的更新要运行的Dags文件,计算巢部署版本默认使用这种方式,需要在Values.yaml中配置对应的git-sync配置。
  2. 在airflow-scheduler pod中对应airflow容器中,直接copy或者写入要执行的Dags文件,这种方式更适合临时测试
  3. 直接将Dags文件放到对应的pvc里,然后挂载到airflow-scheduler pod中对应airflow容器,这种使用起来也不太方便。

下面我们主要介绍第一种方式,使用git仓库去做同步, 我们可以把写好的DAG文件提交到git仓库中,然后airflow-scheduler组件会进行同步, web上就能看到我们定义好的DAG文件,然后点击run按钮就可以运行DAG文件了。在计算巢服务实例部署中,部署的时候需要填入对应的git仓库信息,手动部署的情况下需要手动修改对应的values.yaml,对helm chart做升级部署。

示例演示

下面以一个简单的DAG文件为例,展示如何在airflow中进行运行DAG。

  1. 在git仓库中创建DAG文件,文件名为hello_world_dag.py,里面有三个任务,会依次执行:
  • 打印"Hello"
  • 打印"World"
  • 休眠300秒
import time
from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# 定义默认参数
default_args = {
   
    'owner': 'airflow',              # DAG 的所有者
    'start_date': days_ago(1),       # DAG 的开始时间(1 天前)
    'retries': 1,                    # 任务失败时的重试次数
    'retry_delay': timedelta(minutes=5),  # 重试间隔
}

# 定义 DAG 对象
with DAG(
    dag_id='hello_world_dag',        # DAG 的唯一标识符
    default_args=default_args,       # 使用默认参数
    schedule_interval='@daily',      # 每天运行一次
    catchup=False,                   # 是否补跑历史任务
) as dag:

    # 定义第一个任务:打印 "Hello"
    def print_hello():
        print("Hello")

    task_hello = PythonOperator(
        task_id='print_hello',        # 任务的唯一标识符
        python_callable=print_hello,  # 调用的 Python 函数
    )

    // 定义第二个任务:打印 "World"
    def print_world():
        print("World")

    task_world = PythonOperator(
        task_id='print_world',
        python_callable=print_world,
    )

    # 定义一个休眠任务
    def sleep_task():
        print("Task is sleeping for 300 seconds...")
        time.sleep(300)  # 休眠 300 秒
        print("Task woke up!")

    sleep_operator = PythonOperator(
        task_id='sleep_task',
        python_callable=sleep_task,
    )

    # 设置任务依赖关系
    task_hello >> task_world >> sleep_operator
  1. 提交DAG文件到git仓库中,然后去web端查看,可以看到对应的DAG,这个过程会有延时,默认是每10s同步一次。
  2. 执行这个DAG,点击run按钮,点击进行,可以看到执行记录,点击Graph, 可以看到具体执行步骤,
    可以看到print_hello和print_world都已经执行完了,sleep_task还在执行中,这个功能确实很强大。

  3. 点击还在执行中的sleep_task,可以在Logs里看到输出信息,里面输出了会sleep 300秒,可见在正常执行。

总结

通过上面这个示例,可以看出airflow整体功能还是很强大的,并且图形化做的很好,交互能力很强大,可以直接在页面上进行Dag运行,并且可以清楚的看到DAG的执行情况,并且将每一步的执行过程都以图形化的方式显示出来,里面还有执行时间和日志,用来做工作流还是很好用的。
同时Dag支持python直接进行编码,也提供了很强的扩展性,在工作流中可以做各种业务操作,整体来说还是非常强大的。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
消息中间件 存储 监控
五分钟快速了解Airflow工作流
简介 Airflow是一个以编程方式创作、调度和监控工作流的平台。 使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。 当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
|
5月前
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
626 34
|
11月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
存储 监控 Linux
Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
【2月更文挑战第7天】Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
1469 1
|
12月前
|
存储 Linux 持续交付
史上最全 Terraform 入门教程,助你无坑入门!
【10月更文挑战第26天】这是一个全面的 Terraform 入门教程,涵盖了 Terraform 的基本概念、安装步骤、基础配置、变量和输出变量的使用、模块的定义与使用,以及状态管理。通过实例讲解如何创建本地文件资源和 AWS S3 桶,帮助初学者快速上手并掌握 Terraform 的核心功能。
1889 8
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
2396 3
|
消息中间件 存储 Apache
Apache Paimon 表模式最佳实践
Apache Paimon 表模式最佳实践
3779 57
|
SQL 存储 监控
构建端到端的开源现代数据平台
构建端到端的开源现代数据平台
617 4