Apache AirFlow开篇

简介: 1.简述 Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

1.简述

Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2.工作原理

// TODO

3.常用命令

a).守护进程运行webserver

命令:airflow webserver -p port -D

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8383
Timeout: 120

b).守护进程运行调度器

命令:airflow scheduler 或 airflow scheduler -D

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>

c).守护进程运行worker

命令:airflow worker -D

d).守护进程运行celery worker并指定任务并发数为1

命令:airflow worker -c 1 -D

e).暂停任务

命令:airflow pause dag_id

(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True

f).取消暂停,等同于在管理界面打开off按钮的操作

命令:airflow unpause dag_id

(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False

g).查看task列表

命令:airflow list_tasks dag_id

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
condition_is_False
condition_is_True
false_1
false_2
true_1
true_2

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
puller
push
push_by_returning

h).清空任务实例

命令:airflow clear dag_id

i).运行整个dag文件

命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE

j).运行task

命令:airflow run dag_id task_id execution_date

k).帮助文档

命令:airflow -h 或 airflow --help

(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               
               {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
               ...

positional arguments:
  {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
                        sub-command help
    resetdb             Burn down and rebuild the metadata database
    render              Render a task instance's template(s)
    variables           CRUD operations on variables
    delete_user         Delete an account for the Web UI
    connections         List/Add/Delete connections
    create_user         Create an account for the Web UI (FAB-based)
    rotate_fernet_key   Rotate all encrypted connection credentials and
                        variables; see
                        https://airflow.readthedocs.io/en/stable/howto/secure-
                        connections.html#rotating-encryption-keys.
    pause               Pause a DAG
    sync_perm           Update existing role's permissions.
    task_failed_deps    Returns the unmet dependencies for a task instance
                        from the perspective of the scheduler. In other words,
                        why a task instance doesn't get scheduled and then
                        queued by the scheduler, and then run by an executor).
    version             Show the version
    trigger_dag         Trigger a DAG run
    initdb              Initialize the metadata database
    test                Test a task instance. This will run a task without
                        checking for dependencies or recording its state in
                        the database.
    unpause             Resume a paused DAG
    list_dag_runs       List dag runs given a DAG id. If state option is
                        given, it will onlysearch for all the dagruns with the
                        given state. If no_backfill option is given, it will
                        filter outall backfill dagruns for given dag id.
    dag_state           Get the status of a dag run
    run                 Run a single task instance
    list_tasks          List the tasks within a DAG
    backfill            Run subsections of a DAG for a specified date range.
                        If reset_dag_run option is used, backfill will first
                        prompt users whether airflow should clear all the
                        previous dag_run and task_instances within the
                        backfill date range. If rerun_failed_tasks is used,
                        backfill will auto re-run the previous failed task
                        instances within the backfill date range.
    list_dags           List all the DAGs
    kerberos            Start a kerberos ticket renewer
    worker              Start a Celery worker node
    webserver           Start a Airflow webserver instance
    flower              Start a Celery Flower
    scheduler           Start a scheduler instance
    task_state          Get the status of a task instance
    pool                CRUD operations on pools
    serve_logs          Serve logs generate by worker
    clear               Clear a set of task instance, as if they never ran
    list_users          List accounts for the Web UI
    next_execution      Get the next execution datetime of a DAG.
    upgradedb           Upgrade the metadata database to latest version
    delete_dag          Delete all DB records related to the specified DAG

optional arguments:
  -h, --help            show this help message and exit
目录
相关文章
|
15天前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
208 0
|
5月前
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
1137 3
|
机器学习/深度学习 存储 Kubernetes
如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。 如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
|
存储 Kubernetes 监控
大规模运行 Apache Airflow 的经验和教训
Sam Wheating,来自加拿大不列颠哥伦比亚省温哥华的高级开发人员。供职于 Shopify 的数据基础设施和引擎基础团队。他是开源软件的内部倡导者,也是 Apache Airflow 项目的贡献者。
1198 0
大规模运行 Apache Airflow 的经验和教训
|
前端开发 调度 Apache
作业调度中心Apache Airflow二次开发初体验
作业调度中心Apache Airflow二次开发初体验
1678 1
作业调度中心Apache Airflow二次开发初体验
|
关系型数据库 MySQL 调度
作业调度中心Apache Airflow部署初体验
作业调度中心Apache Airflow部署初体验
1170 0
作业调度中心Apache Airflow部署初体验
|
弹性计算 关系型数据库 MySQL
Deploy and Run Apache Airflow on Alibaba Cloud
# Deploy and Run Apache Airflow on Alibaba Cloud Tutorial of running open source project Apache Airflow on Alibaba Cloud with ApsaraDB (Alibaba Cloud Database). We also show a simple data migration ta
379 0
|
关系型数据库 MySQL Apache
Apache AirFlow安装部署
1.环境依赖 Centos7 组件 版本 Python 2.7.5 AirFlow 1.10.5 pyhton依赖库 (airflow) [bigdata@carbondata airflow]$ pip list DEPRECATION: Python 2.
4043 0
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
627 13
Apache Flink 2.0-preview released

推荐镜像

更多
下一篇
无影云桌面