airflow使用指南-机器学习工程自动化

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: airflow使用指南-机器学习工程自动化

1.airflow简介


Apache Airflow是一个开源工作流管理平台。它可以帮助您实现数据管道和ML管道的自动化,并在行业中广泛应用。您可以进行摄入、执行ETL、执行ML任务,并将日常工作自动化。


大家都知道,很多机器学习的算法,是需要不断的迭代更新参数的,不是一次性实现的,在面对许多这样的机器学习工程时,我们总不能每个工程都要定时去运行,调度,airflow这个平台,就是将我们所有的机器学习工程整合起来,成为不同的工程流水线,统一调度,配置


Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。


1.1 DAG


有向无环图指的是一个无回路的有向图。如果有一个非有向无环图,且A点出发向B经C可回到A,形成一个环。将从C到A的边方向改为从A到C,则变成有向无环图。有向无环图的生成树个数等于入度非零的节点的入度积。

160812939440491aadf3aac861ade0ca.png

2. airflow架构


先用一张图看一个airlflow的工作流程图:



从图中看,我们可以写很多的机器学习代码,分别为worker1、worker…。这些就是airflow平台需要执行的任务进程。


Executor:执行器,用来执行这些机器学习进程的组件

Scheduler:调度器,用来定义这些机器学习进程,执行的先后顺序,比如机器学习先做数据预处理,再特征处理

Web Server:就是一个前端服务器,最后执行完这样一个airflow进程,我们所有的机器学习工程都可以在airflow平台的WebUI上进行操作,监控进程,触发进程等。

meta data:保存我们执行进程的一些信息


这里有个地方需要知道,进程就是我们一个机器学习任务中不同的步骤,多个进程结合起来成为一个完整的机器学习任务,在airflow架构中,需要对进程进行排序前后处理,对任务也要进行前后处理。


进程的排序是由Scheduler完成,而任务是由Dags完成。


3.airflow核心模块


3.1 模块


  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。

Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。

Tasks:Task 是Operator的一个实例,也就是DAGs中的一个node。 Task Instance:task的一次运行。Web界面中可以看到task instance 有自己的状态,包括"running", “success”,“failed”,“skipped”, "up for retry"等。

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >>Task2,表明Task2依赖于Task2了。 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的工作流(workflow)


3.2 Operators模块


DAG 定义一个作业流,Operators 则定义了实际需要执行的作业。airflow 提供了许多 - Operators 来指定我们需要执行的作业:


BashOperator - 执行 bash 命令或脚本。

SSHOperator - 执行远程 bash 命令或脚本(原理同paramiko 模块)。

PythonOperator - 执行 Python 函数。

EmailOperator - 发送Email。

HTTPOperator - 发送一个 HTTP 请求。

MySqlOperator, SqliteOperator,PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行SQL 任务。

DockerOperator, HiveOperator, S3FileTransferOperator,

PrestoToMysqlOperator, SlackOperator 你懂得。除了以上这些 Operators 还可以方便的自定义

Operators 满足个性化的任务需求。


我们最常用的就是BashOperator。像这样导入模块


from airflow import DAG
from airflow.operators.bash_operator import BashOperator


4.airflow安装与使用


😎最好在虚拟环境下安装哈


4.1 安装airflow


pip install apache-airflow==1.10.10
• 1


4.2 修改默认路径


默认安装的配置就是 ~/airflow

临时修改 AIRFLOW_HOME 环境变量, 这里的 /home/airflow 可以替换成你想要的文件夹目录


export AIRFLOW_HOME=/home/airflow


4.3 修改默认数据库


找到配置文件


vi /home/airflow/airflow.cfg


修改sql配置

sql_alchemy_conn = mysql://root:name@localhost:3306/airflow
• 1


这二步可以不用


4.4 初始化数据库


# initialize the database
airflow db init


4.5 添加用户


airflow users create \
    --username admin \
    --firstname 123 \
    --lastname 123 \
    --role Admin \
    --email 123456789@qq.com


创建的用户密码为:quant


4.6 启动web服务


# start the web server, default port is 8080
airflow webserver --port 8080
• 1
• 2


4.7 启动定时任务


airflow scheduler


如果看到跳出这个界面 那恭喜你👍



4.8 编写airflow自动化代码模版


实现第一个 Data Pipeline

DAGs 用 Python 编写,文件储存在 DAG_FOLDER 里(默认在 ~/airflow/dags)。比较重要的参数:


dag_id

description

start_date

schedule_interval:定义 DAG 运行的频率。

depend_on_past:上一次运行成功了,才会运行。

default_args:所有 operators 实例化的默认参数。

Operators类型:


Action operator:执行动作,例如:BashOperator,PythonOperation,EmailOperator 等。

Transfer operator:传输数据,例如:PrestoToMysqlOperator,SftpOperator 等。

Sensor operator:等待数据到达。


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
    'owner': 'Déborah Mesquita',
    'start_date': days_ago(1)
}
# Defining the DAG using Context Manager
with DAG(
        'extract-meeting-activities',
        default_args=default_args,
        schedule_interval=None,
        ) as dag:
        t1 = BashOperator(
                task_id = 'Data preprocessing',
                bash_command = 'Data_preprocessing.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 = BashOperator(
                task_id = 'model structing',
                bash_command = 'model_struct.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 >> t2  # Defining the task dependencies
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
【10月更文挑战第6天】如何使用机器学习模型来自动化评估数据质量?
|
2月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
如何使用机器学习模型来自动化评估数据质量?
|
2月前
|
机器学习/深度学习 数据采集 运维
智能化运维:机器学习在故障预测和自动化响应中的应用
智能化运维:机器学习在故障预测和自动化响应中的应用
64 4
|
2月前
|
机器学习/深度学习
自动化机器学习研究MLR-Copilot:利用大型语言模型进行研究加速
【10月更文挑战第21天】在科技快速发展的背景下,机器学习研究面临诸多挑战。为提高研究效率,研究人员提出了MLR-Copilot系统框架,利用大型语言模型(LLM)自动生成和实施研究想法。该框架分为研究想法生成、实验实施和实施执行三个阶段,通过自动化流程显著提升研究生产力。实验结果显示,MLR-Copilot能够生成高质量的假设和实验计划,并显著提高任务性能。然而,该系统仍需大量计算资源和人类监督。
42 4
|
2月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
如何使用机器学习模型来自动化评估数据质量?
|
2月前
|
机器学习/深度学习 数据采集 人工智能
自动化测试的未来:AI与机器学习的融合之路
【10月更文挑战第41天】随着技术的快速发展,软件测试领域正经历一场由人工智能和机器学习驱动的革命。本文将探讨这一趋势如何改变测试流程、提高测试效率以及未来可能带来的挑战和机遇。我们将通过具体案例分析,揭示AI和ML在自动化测试中的应用现状及其潜力。
50 0
|
3月前
|
机器学习/深度学习 供应链 搜索推荐
机器学习驱动的工厂自动化
机器学习驱动的工厂自动化是一种利用先进的机器学习技术来提升生产效率、降低成本和提高产品质量的智能制造方法。
47 2
|
8月前
|
机器学习/深度学习 存储 搜索推荐
利用机器学习算法改善电商推荐系统的效率
电商行业日益竞争激烈,提升用户体验成为关键。本文将探讨如何利用机器学习算法优化电商推荐系统,通过分析用户行为数据和商品信息,实现个性化推荐,从而提高推荐效率和准确性。
257 14
|
8月前
|
机器学习/深度学习 算法 数据可视化
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
143 1
|
8月前
|
机器学习/深度学习 算法 搜索推荐
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)