Apache AirFlow开篇

简介: 1.简述 Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

1.简述

Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2.工作原理

// TODO

3.常用命令

a).守护进程运行webserver

命令:airflow webserver -p port -D

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8383
Timeout: 120

b).守护进程运行调度器

命令:airflow scheduler 或 airflow scheduler -D

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>

c).守护进程运行worker

命令:airflow worker -D

d).守护进程运行celery worker并指定任务并发数为1

命令:airflow worker -c 1 -D

e).暂停任务

命令:airflow pause dag_id

(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True

f).取消暂停,等同于在管理界面打开off按钮的操作

命令:airflow unpause dag_id

(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False

g).查看task列表

命令:airflow list_tasks dag_id

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
condition_is_False
condition_is_True
false_1
false_2
true_1
true_2

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
puller
push
push_by_returning

h).清空任务实例

命令:airflow clear dag_id

i).运行整个dag文件

命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE

j).运行task

命令:airflow run dag_id task_id execution_date

k).帮助文档

命令:airflow -h 或 airflow --help

(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               
               {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
               ...

positional arguments:
  {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
                        sub-command help
    resetdb             Burn down and rebuild the metadata database
    render              Render a task instance's template(s)
    variables           CRUD operations on variables
    delete_user         Delete an account for the Web UI
    connections         List/Add/Delete connections
    create_user         Create an account for the Web UI (FAB-based)
    rotate_fernet_key   Rotate all encrypted connection credentials and
                        variables; see
                        https://airflow.readthedocs.io/en/stable/howto/secure-
                        connections.html#rotating-encryption-keys.
    pause               Pause a DAG
    sync_perm           Update existing role's permissions.
    task_failed_deps    Returns the unmet dependencies for a task instance
                        from the perspective of the scheduler. In other words,
                        why a task instance doesn't get scheduled and then
                        queued by the scheduler, and then run by an executor).
    version             Show the version
    trigger_dag         Trigger a DAG run
    initdb              Initialize the metadata database
    test                Test a task instance. This will run a task without
                        checking for dependencies or recording its state in
                        the database.
    unpause             Resume a paused DAG
    list_dag_runs       List dag runs given a DAG id. If state option is
                        given, it will onlysearch for all the dagruns with the
                        given state. If no_backfill option is given, it will
                        filter outall backfill dagruns for given dag id.
    dag_state           Get the status of a dag run
    run                 Run a single task instance
    list_tasks          List the tasks within a DAG
    backfill            Run subsections of a DAG for a specified date range.
                        If reset_dag_run option is used, backfill will first
                        prompt users whether airflow should clear all the
                        previous dag_run and task_instances within the
                        backfill date range. If rerun_failed_tasks is used,
                        backfill will auto re-run the previous failed task
                        instances within the backfill date range.
    list_dags           List all the DAGs
    kerberos            Start a kerberos ticket renewer
    worker              Start a Celery worker node
    webserver           Start a Airflow webserver instance
    flower              Start a Celery Flower
    scheduler           Start a scheduler instance
    task_state          Get the status of a task instance
    pool                CRUD operations on pools
    serve_logs          Serve logs generate by worker
    clear               Clear a set of task instance, as if they never ran
    list_users          List accounts for the Web UI
    next_execution      Get the next execution datetime of a DAG.
    upgradedb           Upgrade the metadata database to latest version
    delete_dag          Delete all DB records related to the specified DAG

optional arguments:
  -h, --help            show this help message and exit
目录
相关文章
|
2月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
253 0
|
7月前
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
1464 3
|
机器学习/深度学习 存储 Kubernetes
如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。 如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
|
存储 Kubernetes 监控
大规模运行 Apache Airflow 的经验和教训
Sam Wheating,来自加拿大不列颠哥伦比亚省温哥华的高级开发人员。供职于 Shopify 的数据基础设施和引擎基础团队。他是开源软件的内部倡导者,也是 Apache Airflow 项目的贡献者。
1225 0
大规模运行 Apache Airflow 的经验和教训
|
前端开发 调度 Apache
作业调度中心Apache Airflow二次开发初体验
作业调度中心Apache Airflow二次开发初体验
1724 1
作业调度中心Apache Airflow二次开发初体验
|
关系型数据库 MySQL 调度
作业调度中心Apache Airflow部署初体验
作业调度中心Apache Airflow部署初体验
1194 0
作业调度中心Apache Airflow部署初体验
|
弹性计算 关系型数据库 MySQL
Deploy and Run Apache Airflow on Alibaba Cloud
# Deploy and Run Apache Airflow on Alibaba Cloud Tutorial of running open source project Apache Airflow on Alibaba Cloud with ApsaraDB (Alibaba Cloud Database). We also show a simple data migration ta
391 0
|
关系型数据库 MySQL Apache
Apache AirFlow安装部署
1.环境依赖 Centos7 组件 版本 Python 2.7.5 AirFlow 1.10.5 pyhton依赖库 (airflow) [bigdata@carbondata airflow]$ pip list DEPRECATION: Python 2.
4079 0
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
348 33
The Past, Present and Future of Apache Flink

热门文章

最新文章

推荐镜像

更多