一、Airflow的诞生
2014年,Airbnb创造了一套工作流调度系统:Airflow。Airflow是灵活可扩展的工作流自动化和调度系统 。2019年成为Apache的顶级项目。它可以让程序员通过编程实现:编写、调度、监控工作流,主要针对有大量数据进行传递的场景,不适合解决流式任务(如spark streaming和Flink)。
通过python代码,把任务组织成有向无环图DAG,
通过【依赖关系】在一组服务器上调度任务。
Airflow有啥用:
监控自动化工作的情况(通过web UI和各个worker上记录的执行历史)
自动处理并传输数据
为机器学习或推荐系统提供一个数据管道和使用框架
二、基于CeleryExecutor方式的系统架构
使用celery方式的系统架构图(官方推荐使用这种方式,同时支持mesos方式部署)。turing为外部系统,GDags服务帮助拼接成dag,可以忽略。
master节点webui管理dags、日志等信息。scheduler负责调度,只支持单节点,多节点启动scheduler可能会挂掉
worker负责执行具体dag中的task。这样不同的task可以在不同的环境中执行。
turing为外部系统
GDags服务帮助拼接成dag
master节点webui管理dags、日志等信息
scheduler负责调度,只支持单节点
worker负责执行具体dag中的task, worker支持多节点
三、Airflow的组成成分
Worker:用来处理和执行整个工作流的各个节点的工作任务的东西。
Scheduler:只有干活的worker是不够的,Airflow需要有一个大脑,去检查哪些任务执行了,哪些任务没有执行,以及应该在什么时间执行任务。
WebServer:Airflow是可以用视窗工具(UI)来完成工作流修改,参数修改这些任务的。这个视窗工具就是一个web页面(简单理解就是在浏览器里面打开的网页)。这个WebServer启动之后,我们就可以更方便的去修改和检测我们的工作流了。
中止、恢复、触发任务。
监控正在运行的任务,断点续跑任务。
执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。
配置连接,包括不限于数据库、ssh 的连接等。
四、安装
(1)安装包
yum install python-devel mysql-devel -y yum install python-devel yum install python3-devel yum install mysql-devel pip3 install mysqlclient pip3 install apache-airflow pip3 install apache-airflow[mysql]
(2)修改配置文件
# 初始化原始库 执行完以下命令后会在当前家用户目录下生成airflow目录 airflow db init # vim airflow.cfg 具体要修改的内容如下 [core] executor=LocalExecutor sql_alchemy_conn = mysql://user:password@IP:3306/airflow [smtp] smtp_host = mail.ndpmedia.com smtp_starttls = True smtp_ssl = False smtp_user = user smtp_password = pass smtp_port = 25 smtp_timeout = 30 smtp_mail_from =与user相同 smtp_retry_limit = 5 [webserver] security = Flask AppBuilder secure_mode = True rbac=True
(3)创建用户
airflow users create --username admin --firstname admin --lastname admin --role Admin --email example@XX.com airflow webserver 启动web服务 airflow scheduler 启动调度程序
五、基本命令
$ airflow webserver -D 守护进程运行webserver $ airflow scheduler -D 守护进程运行调度器 $ airflow worker -D 守护进程运行调度器 $ airflow worker -c 1 -D 守护进程运行celery worker并指定任务并发数为1 $ airflow pause dag_id 暂停任务 $ airflow unpause dag_id 取消暂停,等同于在管理界面打开off按钮 $ airflow list_tasks dag_id 查看task列表 $ airflow clear dag_id 清空任务实例 $ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 运行整个dag文件 $ airflow run dag_id task_id execution_date 运行task