[AirFlow]AirFlow使用指南四 DAG Operator Task

简介: 1. DAG在Airflow中,DAG或有向无环图是你运行所有任务的集合,以某种组织方式来反映所有任务之间的关系和依赖。

1. DAG

在Airflow中,DAG或有向无环图是你运行所有任务的集合,以某种组织方式来反映所有任务之间的关系和依赖。

例如,一个简单的DAG可以包括三个任务:A,B和C.可以说A必须在B运行之前成功运行,但C可以随时运行。 可以说任务A在5分钟后超时,为防止失败,B可以最多重启5次。也可以说工作流从某个特定日期开始每晚10点运行。

以这种方式,DAG描述了你如何执行工作流程; 但是请注意,我们还没有说出我们实际想要做的事情! A,B和C可以是任何东西。也许在C发送电子邮件时,A为B准备数据以进行分析。或者A监视你的位置,以便B可以打开你的车库门,而C打开房子的灯。重要的是,DAG不关心其内部任务干了什么;它的目的是确保在正确的时间或以正确的顺序干任何事情,或可以正确处理任何意想不到的问题。

DAG在标准的Python文件中定义,放置在Airflow的DAG_FOLDER中。Airflow将执行每个文件中的代码来动态构建DAG对象。你可以拥有任意数量的DAG,每个可以拥有任意数量的任务。通常,每一个应该对应于一个逻辑工作流。

1.1 作用域(scope)

Airflow将加载从DAG文件导入的任何DAG对象。最重要的是,这意味着DAG必须出现在globals()中。考虑以下两个DAG。只有dag_1将被加载;另一个只出现在局部作用域内。

dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
    dag_2 = DAG('but_this_dag_will_not')

my_function()

有时这有很好的用处。例如,SubDagOperator的通用模式是在函数内定义子dag,以便Airflow不会将其作为独立DAG进行加载。

1.2 默认参数

如果将default_args字典传递给DAG,DAG将会将字典应用于其内部的任何Operator上。这很容易的将常用参数应用于多个Operator,而无需多次键入。

default_args=dict(
    start_date=datetime(2016, 1, 1),
    owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

1.3 上下文管理器(Context Manager)

备注

Airflow 1.8引入

DAG可用作上下文管理器,以自动为DAG分配新的Operator。

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    op = DummyOperator('op')

op.dag is dag # True

2. Operators

DAG描述了如何运行工作流,Operators决定了实际如何完成。

Operator描述了工作流中的单个任务。Operator通常(但不总是)原子性的,这意味着它们可以独立存在,不需要与其他Operator共享资源。DAG将确保Operator按正确的顺序运行; 除了这些依赖之外,Operator通常独立运行。实际上,他们可能会运行在两台完全不同的机器上。

这是一个微小但非常重要的一点:一般来说,如果两个Operator需要共享信息,如文件名或少量数据,则应考虑将它们组合成一个Operator。如果绝对不可避免,Airflow确实有一个名为XCom的Operator可以交叉通信。

Airflow为Operator提供许多常见任务,包括:

  • BashOperator - 执行bash命令
  • PythonOperator - 调用任意的Python函数
  • EmailOperator - 发送邮件
  • HTTPOperator - 发送 HTTP 请求
  • SqlOperator - 执行 SQL 命令
  • Sensor - 等待一定时间,文件,数据库行,S3键等...

除了这些基本的构建块之外,还有更多的特定Operator:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator ...总之你能想到的!

airflow/contrib/目录包含更多由社区建立的Operator。这些Operator并不总是与主包(in the main distribution)中的Operator一样完整或经过很好的测试,但允许用户更轻松地向平台添加新功能。

Operators只有在分配给DAG时,才会被Airflow加载。

2.1 DAG分配

备注

在Airflow 1.8版本中引入

Operator不需要立即分配给DAG(以前dag是必需的参数)。但是一旦operator分配给DAG, 它就不能transferred或者unassigned. 当一个Operator创建时,通过延迟分配或甚至从其他Operator推断,可以让DAG得到明确的分配(DAG assignment can be done explicitly when the operator is created, through deferred assignment, or even inferred from other operators.).

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# 明确指定DAG
explicit_op = DummyOperator(task_id='op1', dag=dag)

# 延迟分配
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# 从其他Operator推断 (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

2.2 位移组合

备注

在Airflow 1.8版本中引入

传统上,使用set_upstream()set_downstream()方法来设置Operator之间的依赖关系。在Airflow 1.8中,可以使用Python位移操作符>><<。 以下四个语句在功能上相当:

op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)

当使用位移操作符去设置Operator依赖关系时,根据位移操作符指向的方向来判断Operator之间的依赖关系。例如,op1 >> op2 表示op1先运行,op2然后运行。可以组合多个Operator - 记住从左到右执行的链,总是返回最右边的对象。 例如:

op1 >> op2 >> op3 << op4

等价于:

op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)

为方便起见,位移操作符也可以与DAG一起使用。 例如:

dag >> op1 >> op2

等价于:

op1.dag = dag
op1.set_downstream(op2)

我们可以把这一切整合在一起,建立一个简单的管道:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    (
        dag
        >> DummyOperator(task_id='dummy_1')
        >> BashOperator(
            task_id='bash_1',
            bash_command='echo "HELLO!"')
        >> PythonOperator(
            task_id='python_1',
            python_callable=lambda: print("GOODBYE!"))
    )

3. Task

一旦Operator被实例化,它被称为"任务"。实例化为在调用抽象Operator时定义一些特定值,参数化任务使之成为DAG中的一个节点。

4. 任务实例化

一个任务实例表示任务的一次特定运行,并且被表征为dag,任务和时间点的组合。任务实例也有指示性状态,可能是“运行”,“成功”,“失败”,“跳过”,“重试”等。

5. 工作流

现在你已经熟悉了Airflow的核心构建块。一些概念可能听起来似曾相似,但词汇可以这样概念化:

  • DAG:描述工作发生的顺序
  • Operator:执行某些工作的模板类
  • Task:Operator的参数化实例
  • TaskInstances(任务实例):1)已分配给DAG的任务,2)具有DAG特定运行相关的状态

通过组合DAGOperators来创建TaskInstances,可以构建复杂的工作流。

原文:http://airflow.incubator.apache.org/concepts.html#

目录
相关文章
|
分布式计算 DataWorks 关系型数据库
DataWorks数据源问题之脏数据如何解决
DataWorks数据源是指DataWorks中配置的用于数据集成的外部数据源;本合集将讲解如何在DataWorks中配置和管理数据源,以及处理数据源连接和集成过程中的问题。
344 2
|
存储 SQL 分布式计算
浅谈MPP数据库-Vertica
用过这块数据库3年时间,很多功能非常强大,POC做了很多数据库,查询性能可以说是最好的,推荐一下
3102 2
|
8月前
|
存储 人工智能 物联网
云计算助力医疗信息化,推动智慧医疗新模式。
云计算正深刻改变医疗行业,通过高效数据存储、管理与共享,优化医疗资源分配,推动远程医疗发展。它解决了“数据孤岛”问题,强化了数据安全与隐私保护,同时助力智慧医疗建设,降低信息化成本并提升服务效率。未来,云计算将与5G、AI等技术融合,进一步促进医疗服务智能化与规范化,为医疗行业带来全新可能。
428 7
|
5月前
|
人工智能 API 定位技术
MCP全方位扫盲
MCP(Model Context Protocol)是由Anthropic提出的协议,旨在标准化大模型与外部数据源和工具的通信方式。其核心架构包括MCP Client(客户端)和MCP Server(服务端),通过标准化接口实现解耦,支持不同LLM无缝调用工具。相比传统方法,MCP简化了Prompt工程,减少定制代码,提升复用性。实际场景中,如天气查询或支付处理,MCP可智能调用对应工具,优化用户体验。MCP的核心价值在于标准化通信、统一工具描述及动态兼容性,成为大模型与外部服务的智能桥梁。
|
存储 机器学习/深度学习 大数据
量子计算与大数据:处理海量信息的新方法
【10月更文挑战第31天】量子计算凭借其独特的量子比特和量子门技术,为大数据处理带来了革命性的变革。相比传统计算机,量子计算在计算效率、存储容量及并行处理能力上具有显著优势,能有效应对信息爆炸带来的挑战。本文探讨了量子计算如何通过量子叠加和纠缠等原理,加速数据处理过程,提升计算效率,特别是在金融、医疗和物流等领域中的具体应用案例,同时也指出了量子计算目前面临的挑战及其未来的发展方向。
|
应用服务中间件 Android开发
Server Tomcat v9.0 Server at localhost failed to start问题的解决
Server Tomcat v9.0 Server at localhost failed to start问题的解决
1334 0
|
存储 NoSQL Java
MPP数据库入门介绍及集群部署
MPP数据库入门介绍及集群部署
395 0
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错之遇到报错org.postgresql.util.psqlexception: The connection attempt failed.,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
320 0
|
SQL 调度 数据库
Airflow的Dag
Airflow的Dag
307 0

热门文章

最新文章