1.airflow简介
Apache Airflow是一个开源工作流管理平台。它可以帮助您实现数据管道和ML管道的自动化,并在行业中广泛应用。您可以进行摄入、执行ETL、执行ML任务,并将日常工作自动化。
大家都知道,很多机器学习的算法,是需要不断的迭代更新参数的,不是一次性实现的,在面对许多这样的机器学习工程时,我们总不能每个工程都要定时去运行,调度,airflow这个平台,就是将我们所有的机器学习工程整合起来,成为不同的工程流水线,统一调度,配置
Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。
1.1 DAG
有向无环图指的是一个无回路的有向图。如果有一个非有向无环图,且A点出发向B经C可回到A,形成一个环。将从C到A的边方向改为从A到C,则变成有向无环图。有向无环图的生成树个数等于入度非零的节点的入度积。
2. airflow架构
先用一张图看一个airlflow的工作流程图:
从图中看,我们可以写很多的机器学习代码,分别为worker1、worker…。这些就是airflow平台需要执行的任务进程。
Executor:执行器,用来执行这些机器学习进程的组件
Scheduler:调度器,用来定义这些机器学习进程,执行的先后顺序,比如机器学习先做数据预处理,再特征处理
Web Server:就是一个前端服务器,最后执行完这样一个airflow进程,我们所有的机器学习工程都可以在airflow平台的WebUI上进行操作,监控进程,触发进程等。
meta data:保存我们执行进程的一些信息
这里有个地方需要知道,进程就是我们一个机器学习任务中不同的步骤,多个进程结合起来成为一个完整的机器学习任务,在airflow架构中,需要对进程进行排序前后处理,对任务也要进行前后处理。
进程的排序是由Scheduler完成,而任务是由Dags完成。
3.airflow核心模块
3.1 模块
- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
Tasks:Task 是Operator的一个实例,也就是DAGs中的一个node。 Task Instance:task的一次运行。Web界面中可以看到task instance 有自己的状态,包括"running", “success”,“failed”,“skipped”, "up for retry"等。
Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >>Task2,表明Task2依赖于Task2了。 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的工作流(workflow)
3.2 Operators模块
DAG 定义一个作业流,Operators 则定义了实际需要执行的作业。airflow 提供了许多 - Operators 来指定我们需要执行的作业:
BashOperator - 执行 bash 命令或脚本。
SSHOperator - 执行远程 bash 命令或脚本(原理同paramiko 模块)。
PythonOperator - 执行 Python 函数。
EmailOperator - 发送Email。
HTTPOperator - 发送一个 HTTP 请求。
MySqlOperator, SqliteOperator,PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行SQL 任务。
DockerOperator, HiveOperator, S3FileTransferOperator,
PrestoToMysqlOperator, SlackOperator 你懂得。除了以上这些 Operators 还可以方便的自定义
Operators 满足个性化的任务需求。
我们最常用的就是BashOperator。像这样导入模块
from airflow import DAG from airflow.operators.bash_operator import BashOperator
4.airflow安装与使用
😎最好在虚拟环境下安装哈
4.1 安装airflow
pip install apache-airflow==1.10.10 • 1
4.2 修改默认路径
默认安装的配置就是 ~/airflow
临时修改 AIRFLOW_HOME 环境变量, 这里的 /home/airflow 可以替换成你想要的文件夹目录
export AIRFLOW_HOME=/home/airflow
4.3 修改默认数据库
找到配置文件
vi /home/airflow/airflow.cfg
修改sql配置
sql_alchemy_conn = mysql://root:name@localhost:3306/airflow • 1
这二步可以不用
4.4 初始化数据库
# initialize the database airflow db init
4.5 添加用户
airflow users create \ --username admin \ --firstname 123 \ --lastname 123 \ --role Admin \ --email 123456789@qq.com
创建的用户密码为:quant
4.6 启动web服务
# start the web server, default port is 8080 airflow webserver --port 8080 • 1 • 2
4.7 启动定时任务
airflow scheduler
如果看到跳出这个界面 那恭喜你👍
4.8 编写airflow自动化代码模版
实现第一个 Data Pipeline
DAGs 用 Python 编写,文件储存在 DAG_FOLDER 里(默认在 ~/airflow/dags)。比较重要的参数:
dag_id
description
start_date
schedule_interval:定义 DAG 运行的频率。
depend_on_past:上一次运行成功了,才会运行。
default_args:所有 operators 实例化的默认参数。
Operators类型:
Action operator:执行动作,例如:BashOperator,PythonOperation,EmailOperator 等。
Transfer operator:传输数据,例如:PrestoToMysqlOperator,SftpOperator 等。
Sensor operator:等待数据到达。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'Déborah Mesquita', 'start_date': days_ago(1) } # Defining the DAG using Context Manager with DAG( 'extract-meeting-activities', default_args=default_args, schedule_interval=None, ) as dag: t1 = BashOperator( task_id = 'Data preprocessing', bash_command = 'Data_preprocessing.py {{ dag_run.conf["working_path"] if dag_run else "" }}', ) t1 = BashOperator( task_id = 'model structing', bash_command = 'model_struct.py {{ dag_run.conf["working_path"] if dag_run else "" }}', ) t1 >> t2 # Defining the task dependencies