概述
Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控工作流(Workflows)。它最初由 Airbnb 开发,并于 2016 年捐赠给 Apache 软件基金会。Airflow 的核心理念是通过代码来定义工作流,使得工作流的管理和维护更加灵活和可扩展, github社区地址见链接。
整体架构
Apache Airflow的架构主要包括以下核心组件:
- Scheduler(调度器):负责根据定义的DAG(Directed Acyclic Graph,有向无环图)图,计划和触发任务的执行。调度器将任务按照依赖关系组织成可执行的工作流程,并将其分发给可用的执行器。
- Executor(执行器):执行器负责执行调度器分发的任务。Airflow支持多种执行器,包括本地执行器(SequentialExecutor)、Celery执行器和Dask执行器等。执行器将任务实际执行在相应的工作节点上,并将执行结果返回。
- Web Server(Web服务器):提供Web用户界面,用于监控和管理工作流的状态、任务的执行情况、查看日志以及触发任务的手动运行等。通过Web界面,用户可以直观地了解工作流的整体情况。
- Database(元数据库):元数据库存储了Airflow的元数据,包括DAG的定义、任务实例的状态、任务执行日志等。这允许用户在不同的任务和工作流之间共享信息,并支持任务的重试、回溯和监控。
- Worker(工作节点):执行器通过工作节点在集群或计算资源上执行任务。工作节点可以是单个服务器或集群,具体取决于所选的执行器类型。
容器部署airflow方式
手动部署
手动部署可以参考Airflow官方教程,这里面介绍了怎么通过helm chart进行airflow部署,这里就不再详述了。不过由于docker.io被墙了,部署的时候需要想办法把海外的镜像拉取下来,或者直接部署到海外地域的集群中。
计算巢一键部署
计算巢提供了免费的airflow社区版服务,支持一键部署,既可以部署到已有容器集群,也支持新建容器集群,同时镜像也都使用的阿里云托管的镜像,不会存在镜像拉取不下来的问题,具体部署方式可以查看服务中的部署文档。
使用方式
Dags文件加载到scheduler调度器中
上面在k8s集群上部署好airflow以后,那么怎么运行我们定义好的DAG工作流呢,这里面主要有三种方式:
- 配置git仓库同步,从git仓库中加载要运行的Dags文件,这是最推荐的方式,可以很容易的更新要运行的Dags文件,计算巢部署版本默认使用这种方式,需要在Values.yaml中配置对应的git-sync配置。
- 在airflow-scheduler pod中对应airflow容器中,直接copy或者写入要执行的Dags文件,这种方式更适合临时测试
- 直接将Dags文件放到对应的pvc里,然后挂载到airflow-scheduler pod中对应airflow容器,这种使用起来也不太方便。
下面我们主要介绍第一种方式,使用git仓库去做同步, 我们可以把写好的DAG文件提交到git仓库中,然后airflow-scheduler组件会进行同步, web上就能看到我们定义好的DAG文件,然后点击run按钮就可以运行DAG文件了。在计算巢服务实例部署中,部署的时候需要填入对应的git仓库信息,手动部署的情况下需要手动修改对应的values.yaml,对helm chart做升级部署。
示例演示
下面以一个简单的DAG文件为例,展示如何在airflow中进行运行DAG。
- 在git仓库中创建DAG文件,文件名为
hello_world_dag.py
,里面有三个任务,会依次执行:
- 打印"Hello"
- 打印"World"
- 休眠300秒
import time
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# 定义默认参数
default_args = {
'owner': 'airflow', # DAG 的所有者
'start_date': days_ago(1), # DAG 的开始时间(1 天前)
'retries': 1, # 任务失败时的重试次数
'retry_delay': timedelta(minutes=5), # 重试间隔
}
# 定义 DAG 对象
with DAG(
dag_id='hello_world_dag', # DAG 的唯一标识符
default_args=default_args, # 使用默认参数
schedule_interval='@daily', # 每天运行一次
catchup=False, # 是否补跑历史任务
) as dag:
# 定义第一个任务:打印 "Hello"
def print_hello():
print("Hello")
task_hello = PythonOperator(
task_id='print_hello', # 任务的唯一标识符
python_callable=print_hello, # 调用的 Python 函数
)
// 定义第二个任务:打印 "World"
def print_world():
print("World")
task_world = PythonOperator(
task_id='print_world',
python_callable=print_world,
)
# 定义一个休眠任务
def sleep_task():
print("Task is sleeping for 300 seconds...")
time.sleep(300) # 休眠 300 秒
print("Task woke up!")
sleep_operator = PythonOperator(
task_id='sleep_task',
python_callable=sleep_task,
)
# 设置任务依赖关系
task_hello >> task_world >> sleep_operator
AI 代码解读
- 提交DAG文件到git仓库中,然后去web端查看,可以看到对应的DAG,这个过程会有延时,默认是每10s同步一次。
- 执行这个DAG,点击run按钮,点击进行,可以看到执行记录,点击Graph, 可以看到具体执行步骤,
可以看到print_hello和print_world都已经执行完了,sleep_task还在执行中,这个功能确实很强大。 - 点击还在执行中的sleep_task,可以在Logs里看到输出信息,里面输出了会sleep 300秒,可见在正常执行。
总结
通过上面这个示例,可以看出airflow整体功能还是很强大的,并且图形化做的很好,交互能力很强大,可以直接在页面上进行Dag运行,并且可以清楚的看到DAG的执行情况,并且将每一步的执行过程都以图形化的方式显示出来,里面还有执行时间和日志,用来做工作流还是很好用的。
同时Dag支持python直接进行编码,也提供了很强的扩展性,在工作流中可以做各种业务操作,整体来说还是非常强大的。