1.简述
Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。
2.工作原理
// TODO
3.常用命令
a).守护进程运行webserver
命令:airflow webserver -p port -D
(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8383
Timeout: 120
b).守护进程运行调度器
命令:airflow scheduler 或 airflow scheduler -D
(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>
c).守护进程运行worker
命令:airflow worker -D
d).守护进程运行celery worker并指定任务并发数为1
命令:airflow worker -c 1 -D
e).暂停任务
命令:airflow pause dag_id
(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True
f).取消暂停,等同于在管理界面打开off按钮的操作
命令:airflow unpause dag_id
(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False
g).查看task列表
命令:airflow list_tasks dag_id
(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
condition_is_False
condition_is_True
false_1
false_2
true_1
true_2
(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
puller
push
push_by_returning
h).清空任务实例
命令:airflow clear dag_id
i).运行整个dag文件
命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE
j).运行task
命令:airflow run dag_id task_id execution_date
k).帮助文档
命令:airflow -h 或 airflow --help
(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
{resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
...
positional arguments:
{resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
sub-command help
resetdb Burn down and rebuild the metadata database
render Render a task instance's template(s)
variables CRUD operations on variables
delete_user Delete an account for the Web UI
connections List/Add/Delete connections
create_user Create an account for the Web UI (FAB-based)
rotate_fernet_key Rotate all encrypted connection credentials and
variables; see
https://airflow.readthedocs.io/en/stable/howto/secure-
connections.html#rotating-encryption-keys.
pause Pause a DAG
sync_perm Update existing role's permissions.
task_failed_deps Returns the unmet dependencies for a task instance
from the perspective of the scheduler. In other words,
why a task instance doesn't get scheduled and then
queued by the scheduler, and then run by an executor).
version Show the version
trigger_dag Trigger a DAG run
initdb Initialize the metadata database
test Test a task instance. This will run a task without
checking for dependencies or recording its state in
the database.
unpause Resume a paused DAG
list_dag_runs List dag runs given a DAG id. If state option is
given, it will onlysearch for all the dagruns with the
given state. If no_backfill option is given, it will
filter outall backfill dagruns for given dag id.
dag_state Get the status of a dag run
run Run a single task instance
list_tasks List the tasks within a DAG
backfill Run subsections of a DAG for a specified date range.
If reset_dag_run option is used, backfill will first
prompt users whether airflow should clear all the
previous dag_run and task_instances within the
backfill date range. If rerun_failed_tasks is used,
backfill will auto re-run the previous failed task
instances within the backfill date range.
list_dags List all the DAGs
kerberos Start a kerberos ticket renewer
worker Start a Celery worker node
webserver Start a Airflow webserver instance
flower Start a Celery Flower
scheduler Start a scheduler instance
task_state Get the status of a task instance
pool CRUD operations on pools
serve_logs Serve logs generate by worker
clear Clear a set of task instance, as if they never ran
list_users List accounts for the Web UI
next_execution Get the next execution datetime of a DAG.
upgradedb Upgrade the metadata database to latest version
delete_dag Delete all DB records related to the specified DAG
optional arguments:
-h, --help show this help message and exit