简介
Airflow是一个以编程方式创作、调度和监控工作流的平台。
使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。
当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
主要特点
- 动态的:Airflow工作流是使用代码(Python)的形式进行配置,允许动态工作流(DAG)生成。并允许编写动态实例化工作流的代码。
- 可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
- 优雅的:设计简洁优雅。使用强大的 Jinja 模板引擎将参数化脚本内置到 Airflow 的核心中。
- 可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。
架构
Airflow 通常由以下组件组成:
- 一个调度器:它处理触发工作流调度,并将任务提交给执行器运行。
- 一个执行器:它处理正在运行的任务。在默认的 Airflow 安装中,它运行调度器内的所有内容,但大多数适合生产的执行器实际上将任务执行推送给workers。
- 一个WEB服务器:它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的运行情况。
- 一个包含DAG文件的文件夹:由调度器和执行器(以及执行程序拥有的任何workers)读取
- 一个元数据数据库:供调度器、执行器和WEB服务器用来存储状态
Airflow安装及初始化
安装Airflow
# AirFlow 需要一个HOME目录,默认是~/airflow目录,你也可以设置到其他地方 export AIRFLOW_HOME=~/airflow # 安装依赖库 AIRFLOW_VERSION=2.1.2 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" # For example: 3.6 CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" # For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.6.txt pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" 复制代码
初始化数据库,并创建用户
# 初始化数据库 airflow db init # 创建用户和密码 airflow users create \ --username admin \ --firstname Peter \ --lastname Parker \ --role Admin \ --email spiderman@superhero.org 复制代码
启动WEB服务及调度器
# 启动 web 服务,默认端口是 8080 airflow webserver --port 8080 # 启动调度器 airflow scheduler # 在浏览器中浏览 10.247.128.69:8080,并在 home 页开启 example dag 复制代码
运行官网Demo
# 运行第一个任务实例 # run your first task instance airflow tasks run example_bash_operator runme_0 2015-01-01 # 运行两天的任务回填 # run a backfill over 2 days airflow dags backfill example_bash_operator \ --start-date 2015-01-01 \ --end-date 2015-01-02 复制代码
示例
定义工作流
~/airflow/dags/tutorial.py
from datetime import timedelta from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago # 定义默认参数 # 这些参数会传递给每个operator # 您可以在operator初始化期间基于每个任务重写它们 default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } # 实例化一个DAG # 我们需要一个 DAG 对象来嵌套我们的任务。 # 在这里,我们传递一个定义 dag_id 的字符串,该字符串用作 DAG 的唯一标识符。 # 我们还传递了刚刚定义的默认参数字典, # 并为 DAG 定义了调度间隔时间(schedule_interval )为1天 。 with DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['example'], ) as dag: # 实例化Operator对象时会生成任务。 # 从Operator实例化的对象称为任务。 第一个参数task_id充当任务的唯一标识符。 # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', ) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, ) # 添加工作流和任务的问题 t1.doc_md = dedent( """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """ ) dag.doc_md = __doc__ # 前提是您在DAG开始时有一个文档字符串 dag.doc_md = """ This is a documentation placed anywhere """ # 否则, type it like this # Jinja模板 templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ ) t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, ) # 设置任务依赖 t1 >> [t2, t3] 复制代码
注意: 在执行您的脚本时,Airflow如果在您的DAG中发现循环或多次引用依赖项时,抛出异常。
测试
运行脚本
首先,让我们确保成功解析工作流。
python ~/airflow/dags/tutorial.py 复制代码
如果脚本没有抛出异常,则意味着您没有做任何极严重的错误,并且您的 Airflow 环境看起来是完好的。
命令行元数据验证
# initialize the database tables airflow db init # print the list of active DAGs airflow dags list # prints the list of tasks in the "tutorial" DAG airflow tasks list tutorial # prints the hierarchy of tasks in the "tutorial" DAG airflow tasks list tutorial --tree 复制代码
测试任务及Jinja模板任务
让我们通过运行特定日期的实际任务实例来进行测试。在上下文中,通过被称为execution_date的字段指定日期。这是逻辑日期,它模拟调度程序在特定日期和时间运行您的任务或 dag,使它现在(或在满足其依赖关系时)实际运行。
# command layout: command subcommand dag_id task_id date # testing print_date airflow tasks test tutorial print_date 2015-06-01 # testing sleep airflow tasks test tutorial sleep 2015-06-01 复制代码
# testing templated # 显示详细的事件日志,最终您的bash命令行运行并打印结果 airflow tasks test tutorial templated 2015-06-01 复制代码
注意:airflow任务测试命令在本地运行任务实例,将它们的日志输出到标准输出(在屏幕上),不影响依赖关系,并且不会将状态(运行、成功、失败……)传递给数据库。 它只是进行测试单个任务实例。
这同样适用于在 DAG 级别上airflow dags test [dag_id] [execution_date]
。 它对给定的 DAG id 执行一次 DAG 运行。 虽然它确实考虑了任务依赖性,但没有在数据库中注册状态。 考虑到这一点,在本地测试 DAG 的完整运行很方便。 如果您的DAG中一项任务需要某个位置的数据,则该数据是可获得的。
回填
回填将按照您的依赖关系,将日志发送到文件中并与数据库交互以记录状态。
如果您有一个web服务,您将能够跟踪进度。如果您有兴趣在回填过程中直观地跟踪进度,airflow webserver将启动一个web服务。
# 可选的, 在后台以Debug模式启动一个WEB服务 # airflow webserver --debug & # start your backfill on a date range airflow dags backfill tutorial \ --start-date 2015-06-01 \ --end-date 2015-06-07 复制代码
注意:
如果您使用depends_on_past=True,则单个任务实例将取决于其前一个任务实例(即根据 execution_date 的前一个)的成功。具有 execution_date==start_date 的任务实例将忽略此依赖性,因为不会为它们创建过去的任务实例。
在使用depends_on_past=True 时,您可能还需要考虑wait_for_downstream=True。虽然depends_on_past=True 导致任务实例依赖于其前一个任务实例的成功,但wait_for_downstream=True 将导致任务实例也等待前一个任务实例下游的所有任务实例成功。
总结
Apache Airflow 允许一个工作流的task在多台worker上同时执行;并以有向无环图的方式构建任务依赖关系;同时,工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务。
总而言之,Apache Airflow 既是最受欢迎的工作流工具,也是功能最广泛的工作流工具。