创建 DAG
接下来我们将创建一个 Airflow 脚本,用于每天早上 8 点执行一个任务:打印 Hello World。
在 Linux 上,我们可以在 crontab 插入一条记录:
使用 SpringBoot,我们可以使用 @Scheduled(cron="0 0 8 * * ?") 来定时执行一个方法。
使用 Airflow,也差不多类似。
前文作者已经讲述了如何使用 Docker 安装 Airflow,安装时,已经将 Airflow 的 DAG 目录进行了映射,即:/Users/xuew/Environment/Airflow/dags,接下来我们只需要在此目录下创建 Python 脚本。
创建一个 hello.py
""" Airflow 的第一个 DAG """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime default_args = { "owner": "xuew", "start_date": datetime(2023, 8, 1) } dag = DAG("HelloWorld", description="第一个 DAG", default_args=default_args, schedule_interval='0 8 * * *') t1 = BashOperator(task_id="hello", bash_command="echo 'Hello World, today is {{ ds }}'", dag=dag)
DAG
表示一个有向无环图,一个任务链,其 ID 全局唯一。DAG 是 Airflow 的核心概念,任务装载到 DAG 中,封装成任务依赖链条。DAG 决定这些任务的执行规则,比如执行时间。这里设置为从 8 月 1 号开始,每天 8 点执行。
TASK
Task 表示具体的一个任务,其 ID 在 DAG 内唯一。Task 有不同的种类,通过各种 Operator 插件来区分任务类型。这里是一个 BashOperator,来自 Airflow 自带的插件,Airflow 自带了很多拆箱即用的插件。
Airflow Operator 是 DAG 中的任务节点,用于定义具体的任务逻辑和操作。
Airflow 提供了多种内置的 Operator,可以根据不同的需求选择适合的 Operator。以下是常用的一些 Airflow Operator:
- BashOperator:执行 Bash 命令或脚本。
- PythonOperator:执行 Python 函数。
- EmailOperator:发送电子邮件。
- SQLOperator:执行 SQL 查询或操作数据库。
- DummyOperator:仅用于创建任务依赖关系,不执行任何实际操作。
- S3FileTransformOperator:在 Amazon S3 上执行文件转换操作。
- HttpOperator:执行 HTTP 请求。
- DockerOperator:运行 Docker 容器中的任务。
- KubernetesPodOperator:在 Kubernetes 集群上运行容器化任务。
除了这些内置的 Operator,Airflow 还支持自定义 Operator,你可以根据具体需求编写自己的 Operator 类,并在 DAG 中使用。
需要根据实际情况选择合适的 Operator 来定义任务的逻辑和操作,以构建完整的 Airflow 工作流程。
ds
Airflow 内置的时间变量模板,在渲染 Operator 的时候,会注入一个当前执行日期的字符串。
部署 DAG
将上述 hello.py 上传到 DAG 目录,Airflow 会自动检测文件变化,然后解析 py 文件,导入 DAG 定义到数据库。
访问 Airflow 的 Web 页面,刷新即可看到刚刚创建的名字为 HelloWorld 的 DAG。
开启 DAG 后点击名称可以查看详情。
这样就是一个基本的 Airflow 任务单元了,这个任务每天 8 点会执行。
调度系统概念
任务定义
定义一个任务的具体内容,比如这里就是打印 Hello World,today is {{ ds }}。
任务实例
任务设定了运行时间,每次运行时会生成一个实例,即 dag-task-executiondate 标记一个任务实例。任务实例和任务当前代表的执行时间绑定,本 Demo 中,每天会生成一个任务实例。
执行日期
今天是 2023-08-20,但我们日志里打印的任务执行日期是 2023-08-19。
执行日期是任务实例运行所代表的任务时间,我们通常叫做 execute-date 或 bizdate,类似 hive 表的的分区。
为什么今天执行的任务,任务的时间变量是昨天呢?
因为任务实例是一个时间段的任务,比如计算每天的访问量,我们只有 19 号这一天过去了才能计算 19 号这一天的的总量,那这个任务最早要 20 号 0 点之后才能计算,计算 19 号 0 点到 20 号 0 点之间的访问量。所以,这个任务时间就代表任务要处理的数据时间,就是 19 号。任务真正执行时间不固定的,可以 20 号,也可以 21 号,只要任务执行计算的数据区间是 19 号就可以了。
因此,调度系统中的 ds(execution date)通常是过去的一个周期,即本周期执行上周期的任务。
任务依赖
最典型的任务模型 etl(Extract & Transformation & Loading,即数据抽取,转换,加载)最少也要分成 3 步。对于每天要统计访问量这个目标来说, 我必须要抽取访问日志,找到访问量的字段,计算累加。这 3 个任务之间有先后顺序,必须前一个执行完毕之后,后一个才可以执行。这叫任务依赖。不同的任务之间的依赖,在 Airflow 里,通过关联任务实现依赖。
还有同一个任务的时间依赖。比如,计算新增用户量。我必须知道前天的数据和昨天的数据,才能计算出增量。那么,这个任务就必须依赖于昨天的任务状态。在 Airflow 里通过设置 depends_on_past 来决定。
任务补录
Airflow 里有个功能叫 backfill,可以执行过去时间的任务。我们把这个操作叫做补录或者补数,为了计算以前没计算的数据。
我们的任务是按时间执行的,今天创建了一个任务,计算每天的用户量,那么明天会跑出今天的数据。这时候,我想知道过去 1 个月每天的用户增量怎么办?
自己写代码,只要查询日期范围的数据,然后分别计算就好。但调度任务是固定的,根据日期去执行的。我们只能创建不同日期的任务实例去执行这些任务。backfill 就是实现这种功能的。
任务重跑
即让跑过的任务再跑一次。有时候,我们的任务需要重跑。比如 etl 任务,今天突然发现昨天抽取的数据任务有问题,少抽取一个 app 的数据,那后面的计算用户量就不准确,我们就需要重新抽取、重新计算。
在 Airflow 里,通过点击任务实例的 clear 按钮,删除这个任务实例,然后调度系统会再次创建并执行这个实例。