Apache AirFlow开篇

简介: 1.简述 Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。


命令:airflow webserver -p port -D

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Timeout: 120


命令:airflow scheduler 或 airflow scheduler -D

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>


命令:airflow worker -D

d).守护进程运行celery worker并指定任务并发数为1

命令:airflow worker -c 1 -D


命令:airflow pause dag_id

(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True


命令:airflow unpause dag_id

(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False


命令:airflow list_tasks dag_id

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags


命令:airflow clear dag_id


命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE


命令:airflow run dag_id task_id execution_date


命令:airflow -h 或 airflow --help

(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]

positional arguments:
                        sub-command help
    resetdb             Burn down and rebuild the metadata database
    render              Render a task instance's template(s)
    variables           CRUD operations on variables
    delete_user         Delete an account for the Web UI
    connections         List/Add/Delete connections
    create_user         Create an account for the Web UI (FAB-based)
    rotate_fernet_key   Rotate all encrypted connection credentials and
                        variables; see
    pause               Pause a DAG
    sync_perm           Update existing role's permissions.
    task_failed_deps    Returns the unmet dependencies for a task instance
                        from the perspective of the scheduler. In other words,
                        why a task instance doesn't get scheduled and then
                        queued by the scheduler, and then run by an executor).
    version             Show the version
    trigger_dag         Trigger a DAG run
    initdb              Initialize the metadata database
    test                Test a task instance. This will run a task without
                        checking for dependencies or recording its state in
                        the database.
    unpause             Resume a paused DAG
    list_dag_runs       List dag runs given a DAG id. If state option is
                        given, it will onlysearch for all the dagruns with the
                        given state. If no_backfill option is given, it will
                        filter outall backfill dagruns for given dag id.
    dag_state           Get the status of a dag run
    run                 Run a single task instance
    list_tasks          List the tasks within a DAG
    backfill            Run subsections of a DAG for a specified date range.
                        If reset_dag_run option is used, backfill will first
                        prompt users whether airflow should clear all the
                        previous dag_run and task_instances within the
                        backfill date range. If rerun_failed_tasks is used,
                        backfill will auto re-run the previous failed task
                        instances within the backfill date range.
    list_dags           List all the DAGs
    kerberos            Start a kerberos ticket renewer
    worker              Start a Celery worker node
    webserver           Start a Airflow webserver instance
    flower              Start a Celery Flower
    scheduler           Start a scheduler instance
    task_state          Get the status of a task instance
    pool                CRUD operations on pools
    serve_logs          Serve logs generate by worker
    clear               Clear a set of task instance, as if they never ran
    list_users          List accounts for the Web UI
    next_execution      Get the next execution datetime of a DAG.
    upgradedb           Upgrade the metadata database to latest version
    delete_dag          Delete all DB records related to the specified DAG

optional arguments:
  -h, --help            show this help message and exit
