五分钟快速了解Airflow工作流

简介: 简介Airflow是一个以编程方式创作、调度和监控工作流的平台。使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。

简介


Airflow是一个以编程方式创作、调度和监控工作流的平台。

使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。

当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。

网络异常,图片无法展示
|


主要特点


  • 动态的:Airflow工作流是使用代码(Python)的形式进行配置,允许动态工作流(DAG)生成。并允许编写动态实例化工作流的代码。
  • 可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
  • 优雅的:设计简洁优雅。使用强大的 Jinja 模板引擎将参数化脚本内置到 Airflow 的核心中。
  • 可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。


架构


网络异常,图片无法展示
|


Airflow 通常由以下组件组成:

  • 一个调度器:它处理触发工作流调度,并将任务提交给执行器运行。
  • 一个执行器:它处理正在运行的任务。在默认的 Airflow 安装中,它运行调度器内的所有内容,但大多数适合生产的执行器实际上将任务执行推送给workers。
  • 一个WEB服务器:它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的运行情况。
  • 一个包含DAG文件的文件夹:由调度器和执行器(以及执行程序拥有的任何workers)读取
  • 一个元数据数据库:供调度器、执行器和WEB服务器用来存储状态


Airflow安装及初始化


安装Airflow

# AirFlow 需要一个HOME目录,默认是~/airflow目录,你也可以设置到其他地方
export AIRFLOW_HOME=~/airflow
# 安装依赖库
AIRFLOW_VERSION=2.1.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.6.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
复制代码

初始化数据库,并创建用户

# 初始化数据库
airflow db init
# 创建用户和密码
airflow users create \
 --username admin \
 --firstname Peter \
 --lastname Parker \
 --role Admin \
 --email spiderman@superhero.org
复制代码


启动WEB服务及调度器

# 启动 web 服务,默认端口是 8080
airflow webserver --port 8080
# 启动调度器
airflow scheduler
# 在浏览器中浏览 10.247.128.69:8080,并在 home 页开启 example dag
复制代码


运行官网Demo

# 运行第一个任务实例
# run your first task instance
airflow tasks run example_bash_operator runme_0 2015-01-01
# 运行两天的任务回填
# run a backfill over 2 days
airflow dags backfill example_bash_operator \
 --start-date 2015-01-01 \
 --end-date 2015-01-02
复制代码


示例


定义工作流

~/airflow/dags/tutorial.py

from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# 定义默认参数
# 这些参数会传递给每个operator
# 您可以在operator初始化期间基于每个任务重写它们
default_args = {
 'owner': 'airflow',
 'depends_on_past': False,
 'email': ['airflow@example.com'],
 'email_on_failure': False,
 'email_on_retry': False,
 'retries': 1,
 'retry_delay': timedelta(minutes=5),
 # 'queue': 'bash_queue',
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
 # 'wait_for_downstream': False,
 # 'dag': dag,
 # 'sla': timedelta(hours=2),
 # 'execution_timeout': timedelta(seconds=300),
 # 'on_failure_callback': some_function,
 # 'on_success_callback': some_other_function,
 # 'on_retry_callback': another_function,
 # 'sla_miss_callback': yet_another_function,
 # 'trigger_rule': 'all_success'
}
# 实例化一个DAG
# 我们需要一个 DAG 对象来嵌套我们的任务。 
# 在这里,我们传递一个定义 dag_id 的字符串,该字符串用作 DAG 的唯一标识符。 
# 我们还传递了刚刚定义的默认参数字典,
# 并为 DAG 定义了调度间隔时间(schedule_interval )为1天 。
with DAG(
 'tutorial',
 default_args=default_args,
 description='A simple tutorial DAG',
 schedule_interval=timedelta(days=1),
 start_date=days_ago(2),
 tags=['example'],
) as dag:
 # 实例化Operator对象时会生成任务。 
 # 从Operator实例化的对象称为任务。 第一个参数task_id充当任务的唯一标识符。
 # t1, t2 and t3 are examples of tasks created by instantiating operators
 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
 )
 t2 = BashOperator(
     task_id='sleep',
     depends_on_past=False,
     bash_command='sleep 5',
     retries=3,
 )
 # 添加工作流和任务的问题
 t1.doc_md = dedent(
     """\
 #### Task Documentation
 You can document your task using the attributes `doc_md` (markdown),
 `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
 rendered in the UI's Task Instance Details page.
 ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
 """
 )
 dag.doc_md = __doc__  # 前提是您在DAG开始时有一个文档字符串
 dag.doc_md = """
 This is a documentation placed anywhere
 """     # 否则, type it like this
 # Jinja模板
 templated_command = dedent(
     """
 {% for i in range(5) %}
     echo "{{ ds }}"
     echo "{{ macros.ds_add(ds, 7)}}"
     echo "{{ params.my_param }}"
 {% endfor %}
 """
 )
 t3 = BashOperator(
     task_id='templated',
     depends_on_past=False,
     bash_command=templated_command,
     params={'my_param': 'Parameter I passed in'},
 )
 # 设置任务依赖
 t1 >> [t2, t3]
复制代码


注意: 在执行您的脚本时,Airflow如果在您的DAG中发现循环或多次引用依赖项时,抛出异常。

测试

运行脚本

首先,让我们确保成功解析工作流。

python ~/airflow/dags/tutorial.py
复制代码


如果脚本没有抛出异常,则意味着您没有做任何极严重的错误,并且您的 Airflow 环境看起来是完好的。

命令行元数据验证

# initialize the database tables
airflow db init
# print the list of active DAGs
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
复制代码


测试任务及Jinja模板任务

让我们通过运行特定日期的实际任务实例来进行测试。在上下文中,通过被称为execution_date的字段指定日期。这是逻辑日期,它模拟调度程序在特定日期和时间运行您的任务或 dag,使它现在(或在满足其依赖关系时)实际运行。

# command layout: command subcommand dag_id task_id date
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
复制代码


# testing templated
# 显示详细的事件日志,最终您的bash命令行运行并打印结果
airflow tasks test tutorial templated 2015-06-01
复制代码


注意:airflow任务测试命令在本地运行任务实例,将它们的日志输出到标准输出(在屏幕上),不影响依赖关系,并且不会将状态(运行、成功、失败……)传递给数据库。 它只是进行测试单个任务实例。

这同样适用于在 DAG 级别上airflow dags test [dag_id] [execution_date]。 它对给定的 DAG id 执行一次 DAG 运行。 虽然它确实考虑了任务依赖性,但没有在数据库中注册状态。 考虑到这一点,在本地测试 DAG 的完整运行很方便。 如果您的DAG中一项任务需要某个位置的数据,则该数据是可获得的。

回填

回填将按照您的依赖关系,将日志发送到文件中并与数据库交互以记录状态。

如果您有一个web服务,您将能够跟踪进度。如果您有兴趣在回填过程中直观地跟踪进度,airflow webserver将启动一个web服务。

# 可选的, 在后台以Debug模式启动一个WEB服务
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial \
 --start-date 2015-06-01 \
 --end-date 2015-06-07
复制代码


注意

如果您使用depends_on_past=True,则单个任务实例将取决于其前一个任务实例(即根据 execution_date 的前一个)的成功。具有 execution_date==start_date 的任务实例将忽略此依赖性,因为不会为它们创建过去的任务实例。

在使用depends_on_past=True 时,您可能还需要考虑wait_for_downstream=True。虽然depends_on_past=True 导致任务实例依赖于其前一个任务实例的成功,但wait_for_downstream=True 将导致任务实例也等待前一个任务实例下游的所有任务实例成功。

总结


Apache Airflow 允许一个工作流的task在多台worker上同时执行;并以有向无环图的方式构建任务依赖关系;同时,工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务。

总而言之,Apache Airflow 既是最受欢迎的工作流工具,也是功能最广泛的工作流工具。

相关文章
|
Kubernetes 调度 开发工具
使用Airflow在k8s集群上轻松搭建企业级工作流
Apache Airflow 是一个开源工作流管理平台,支持编写、调度与监控复杂任务流。其核心通过代码定义工作流(DAG),结合 Scheduler、Executor、Web Server 等组件实现灵活的任务管理和执行。Airflow 支持容器化部署,如通过 Helm Chart 手动部署或使用阿里云计算巢一键部署,简化运维复杂度。实际使用中,可通过 Git 仓库同步 DAG 文件至 Scheduler,支持任务依赖编排与日志跟踪。示例展示了简单的 Hello World 工作流从代码到运行的全流程,验证了其强大的图形化交互和业务扩展能力。
|
canal 分布式计算 Hadoop
canal针对分库分表场景的高可用架构设计与应用
canal针对分库分表场景的高可用架构设计与应用
|
存储 Kubernetes Linux
helm 简介及基本使用
helm 简介及基本使用
4098 0
helm 简介及基本使用
|
Linux
阿里云官方yum源
阿里云官方yum源
75336 0
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
1323 34
|
6月前
|
数据采集 分布式计算 DataWorks
大数据平台架构:MaxCompute+DataWorks
本文详解基于MaxCompute与DataWorks的大数据平台架构,涵盖数据湖、仓库与应用三位一体的体系,深入解析数据集成、开发、调度、质量管控与服务全链路能力,并结合用户行为分析实战案例,展现高效、稳定的数据平台构建方法,助力企业释放数据价值,推动数字化转型。(238字)
328 0
|
8月前
|
数据采集 人工智能 自然语言处理
重磅干货|《AI时代数据治理白皮书》正式发布!
AI时代,数据质量决定智能上限。阿里巴巴Dataphin推出《AI时代数据治理白皮书》,提出“好数据×好知识=真智能”,详解面向AI的四层治理体系,揭示如何以高质量数据驱动智能化转型,助力企业构建核心竞争力。
1500 0
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
负载均衡 安全 网络协议
Nginx四层负载均衡详解
Nginx四层负载均衡就是实现通过访问某个ip的端口转发至对应的服务器上,如图当访问10.0.0.5的5555端口就会跳转至web服务器172.1.16.7的22号端口,当访问10.0.0.5的6666端口就会转发到mysql服务器的3306端口,高效的保护了内网的安全。 为什么企业不再使用lvs而选择使用Nginx做负载 1.Nginx既支持四层又支持七层 2.很多企业使用云平台,但是云平台网络环境不支持lvs 3.都是用Nginx方便统一管理
2273 0
Nginx四层负载均衡详解