使用Airflow在k8s集群上轻松搭建企业级工作流

简介: Apache Airflow 是一个开源工作流管理平台,支持编写、调度与监控复杂任务流。其核心通过代码定义工作流(DAG),结合 Scheduler、Executor、Web Server 等组件实现灵活的任务管理和执行。Airflow 支持容器化部署,如通过 Helm Chart 手动部署或使用阿里云计算巢一键部署,简化运维复杂度。实际使用中,可通过 Git 仓库同步 DAG 文件至 Scheduler,支持任务依赖编排与日志跟踪。示例展示了简单的 Hello World 工作流从代码到运行的全流程,验证了其强大的图形化交互和业务扩展能力。

概述

Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控工作流(Workflows)。它最初由 Airbnb 开发,并于 2016 年捐赠给 Apache 软件基金会。Airflow 的核心理念是通过代码来定义工作流,使得工作流的管理和维护更加灵活和可扩展, github社区地址见链接

整体架构


Apache Airflow的架构主要包括以下核心组件:

  1. Scheduler(调度器):负责根据定义的DAG(Directed Acyclic Graph,有向无环图)图,计划和触发任务的执行。调度器将任务按照依赖关系组织成可执行的工作流程,并将其分发给可用的执行器。
  2. Executor(执行器):执行器负责执行调度器分发的任务。Airflow支持多种执行器,包括本地执行器(SequentialExecutor)、Celery执行器和Dask执行器等。执行器将任务实际执行在相应的工作节点上,并将执行结果返回。
  3. Web Server(Web服务器):提供Web用户界面,用于监控和管理工作流的状态、任务的执行情况、查看日志以及触发任务的手动运行等。通过Web界面,用户可以直观地了解工作流的整体情况。
  4. Database(元数据库):元数据库存储了Airflow的元数据,包括DAG的定义、任务实例的状态、任务执行日志等。这允许用户在不同的任务和工作流之间共享信息,并支持任务的重试、回溯和监控。
  5. Worker(工作节点):执行器通过工作节点在集群或计算资源上执行任务。工作节点可以是单个服务器或集群,具体取决于所选的执行器类型。

容器部署airflow方式

手动部署

手动部署可以参考Airflow官方教程,这里面介绍了怎么通过helm chart进行airflow部署,这里就不再详述了。不过由于docker.io被墙了,部署的时候需要想办法把海外的镜像拉取下来,或者直接部署到海外地域的集群中。

计算巢一键部署

计算巢提供了免费的airflow社区版服务,支持一键部署,既可以部署到已有容器集群,也支持新建容器集群,同时镜像也都使用的阿里云托管的镜像,不会存在镜像拉取不下来的问题,具体部署方式可以查看服务中的部署文档。

使用方式

Dags文件加载到scheduler调度器中

上面在k8s集群上部署好airflow以后,那么怎么运行我们定义好的DAG工作流呢,这里面主要有三种方式:

  1. 配置git仓库同步,从git仓库中加载要运行的Dags文件,这是最推荐的方式,可以很容易的更新要运行的Dags文件,计算巢部署版本默认使用这种方式,需要在Values.yaml中配置对应的git-sync配置。
  2. 在airflow-scheduler pod中对应airflow容器中,直接copy或者写入要执行的Dags文件,这种方式更适合临时测试
  3. 直接将Dags文件放到对应的pvc里,然后挂载到airflow-scheduler pod中对应airflow容器,这种使用起来也不太方便。

下面我们主要介绍第一种方式,使用git仓库去做同步, 我们可以把写好的DAG文件提交到git仓库中,然后airflow-scheduler组件会进行同步, web上就能看到我们定义好的DAG文件,然后点击run按钮就可以运行DAG文件了。在计算巢服务实例部署中,部署的时候需要填入对应的git仓库信息,手动部署的情况下需要手动修改对应的values.yaml,对helm chart做升级部署。

示例演示

下面以一个简单的DAG文件为例,展示如何在airflow中进行运行DAG。

  1. 在git仓库中创建DAG文件,文件名为hello_world_dag.py,里面有三个任务,会依次执行:
  • 打印"Hello"
  • 打印"World"
  • 休眠300秒
import time
from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# 定义默认参数
default_args = {
   
    'owner': 'airflow',              # DAG 的所有者
    'start_date': days_ago(1),       # DAG 的开始时间(1 天前)
    'retries': 1,                    # 任务失败时的重试次数
    'retry_delay': timedelta(minutes=5),  # 重试间隔
}

# 定义 DAG 对象
with DAG(
    dag_id='hello_world_dag',        # DAG 的唯一标识符
    default_args=default_args,       # 使用默认参数
    schedule_interval='@daily',      # 每天运行一次
    catchup=False,                   # 是否补跑历史任务
) as dag:

    # 定义第一个任务:打印 "Hello"
    def print_hello():
        print("Hello")

    task_hello = PythonOperator(
        task_id='print_hello',        # 任务的唯一标识符
        python_callable=print_hello,  # 调用的 Python 函数
    )

    // 定义第二个任务:打印 "World"
    def print_world():
        print("World")

    task_world = PythonOperator(
        task_id='print_world',
        python_callable=print_world,
    )

    # 定义一个休眠任务
    def sleep_task():
        print("Task is sleeping for 300 seconds...")
        time.sleep(300)  # 休眠 300 秒
        print("Task woke up!")

    sleep_operator = PythonOperator(
        task_id='sleep_task',
        python_callable=sleep_task,
    )

    # 设置任务依赖关系
    task_hello >> task_world >> sleep_operator
AI 代码解读
  1. 提交DAG文件到git仓库中,然后去web端查看,可以看到对应的DAG,这个过程会有延时,默认是每10s同步一次。
  2. 执行这个DAG,点击run按钮,点击进行,可以看到执行记录,点击Graph, 可以看到具体执行步骤,
    可以看到print_hello和print_world都已经执行完了,sleep_task还在执行中,这个功能确实很强大。

  3. 点击还在执行中的sleep_task,可以在Logs里看到输出信息,里面输出了会sleep 300秒,可见在正常执行。

总结

通过上面这个示例,可以看出airflow整体功能还是很强大的,并且图形化做的很好,交互能力很强大,可以直接在页面上进行Dag运行,并且可以清楚的看到DAG的执行情况,并且将每一步的执行过程都以图形化的方式显示出来,里面还有执行时间和日志,用来做工作流还是很好用的。
同时Dag支持python直接进行编码,也提供了很强的扩展性,在工作流中可以做各种业务操作,整体来说还是非常强大的。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
打赏
0
8
8
2
24
分享
相关文章
Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
【2月更文挑战第6天】Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
856 0
新一代 Cron-Job分布式任务调度平台 部署指南
简单易用、超低延迟,支持用户权限管理、多语言客户端和多租户接入的分布式任务调度平台。 支持任何Cron表达式的任务调度,支持常用的分片和随机策略;支持失败丢弃、失败重试的失败策略;支持动态任务参数。
133 29
基于HPC场景的集群任务调度系统LSF/SGE/Slurm/PBS
在HPC场景中,集群任务调度系统是资源管理和作业调度的核心工具。LSF、SGE、Slurm和PBS是主流调度系统。LSF适合大规模企业级集群,提供高可靠性和混合云支持;SGE为经典开源系统,适用于中小规模集群;Slurm成为HPC领域事实标准,支持多架构和容器化;PBS兼具商业和开源版本,擅长拓扑感知调度。选型建议:超大规模科研用Slurm,企业生产环境用LSF/PBS Pro,混合云需求选LSF/PBS Pro,传统小型集群用SGE/Slurm。当前趋势显示Slurm在TOP500系统中占比超60%,而商业系统在金融、制造等领域保持优势。
533 32
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
1720 3
面试分享:Airflow工作流调度系统架构与使用指南
【4月更文挑战第10天】Apache Airflow是关键的工作流调度系统,本文结合面试经验,深入探讨其核心架构和使用技巧。重点包括:1) Airflow的Scheduler、Web Server、Worker和Metadata Database组件;2) DAG、Task和Operator的概念;3) DAG编写、调度及错误处理策略;4) 监控与扩展性,如自定义Operator和最佳实践。通过学习,助你在面试中应对Airflow相关问题,并提升实际工作中的数据工程能力。
757 5
使用Airflow管理大数据工作流:自动化任务调度与依赖
【4月更文挑战第8天】Apache Airflow是一款开源的工作流管理工具,用于高效组织和调度大数据任务。它基于DAG(有向无环图)定义任务依赖,通过Operators(如BashOperator、PythonOperator)执行不同工作,并通过Scheduler和Executor协调任务执行。Web UI提供监控界面,Metadata DB存储元数据。创建DAG涉及定义DAG属性、Task及依赖关系,然后部署到Airflow环境。进阶功能包括Variables和Connections管理、XCom跨Task通信、自定义Operator及Plugin、高级调度与告警设置。
812 0
如何在云平台创建一个 FC——用 Serverless 协调工作流
如何在云平台创建一个 FC——用 Serverless 协调工作流自制脑图
176 0
如何在云平台创建一个 FC——用 Serverless 协调工作流
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等