airflow使用指南-机器学习工程自动化

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: airflow使用指南-机器学习工程自动化

1.airflow简介


Apache Airflow是一个开源工作流管理平台。它可以帮助您实现数据管道和ML管道的自动化,并在行业中广泛应用。您可以进行摄入、执行ETL、执行ML任务,并将日常工作自动化。


大家都知道,很多机器学习的算法,是需要不断的迭代更新参数的,不是一次性实现的,在面对许多这样的机器学习工程时,我们总不能每个工程都要定时去运行,调度,airflow这个平台,就是将我们所有的机器学习工程整合起来,成为不同的工程流水线,统一调度,配置


Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。


1.1 DAG


有向无环图指的是一个无回路的有向图。如果有一个非有向无环图,且A点出发向B经C可回到A,形成一个环。将从C到A的边方向改为从A到C,则变成有向无环图。有向无环图的生成树个数等于入度非零的节点的入度积。

160812939440491aadf3aac861ade0ca.png

2. airflow架构


先用一张图看一个airlflow的工作流程图:



从图中看,我们可以写很多的机器学习代码,分别为worker1、worker…。这些就是airflow平台需要执行的任务进程。


Executor:执行器,用来执行这些机器学习进程的组件

Scheduler:调度器,用来定义这些机器学习进程,执行的先后顺序,比如机器学习先做数据预处理,再特征处理

Web Server:就是一个前端服务器,最后执行完这样一个airflow进程,我们所有的机器学习工程都可以在airflow平台的WebUI上进行操作,监控进程,触发进程等。

meta data:保存我们执行进程的一些信息


这里有个地方需要知道,进程就是我们一个机器学习任务中不同的步骤,多个进程结合起来成为一个完整的机器学习任务,在airflow架构中,需要对进程进行排序前后处理,对任务也要进行前后处理。


进程的排序是由Scheduler完成,而任务是由Dags完成。


3.airflow核心模块


3.1 模块


  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。

Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。

Tasks:Task 是Operator的一个实例,也就是DAGs中的一个node。 Task Instance:task的一次运行。Web界面中可以看到task instance 有自己的状态,包括"running", “success”,“failed”,“skipped”, "up for retry"等。

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >>Task2,表明Task2依赖于Task2了。 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的工作流(workflow)


3.2 Operators模块


DAG 定义一个作业流,Operators 则定义了实际需要执行的作业。airflow 提供了许多 - Operators 来指定我们需要执行的作业:


BashOperator - 执行 bash 命令或脚本。

SSHOperator - 执行远程 bash 命令或脚本(原理同paramiko 模块)。

PythonOperator - 执行 Python 函数。

EmailOperator - 发送Email。

HTTPOperator - 发送一个 HTTP 请求。

MySqlOperator, SqliteOperator,PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行SQL 任务。

DockerOperator, HiveOperator, S3FileTransferOperator,

PrestoToMysqlOperator, SlackOperator 你懂得。除了以上这些 Operators 还可以方便的自定义

Operators 满足个性化的任务需求。


我们最常用的就是BashOperator。像这样导入模块


from airflow import DAG
from airflow.operators.bash_operator import BashOperator


4.airflow安装与使用


😎最好在虚拟环境下安装哈


4.1 安装airflow


pip install apache-airflow==1.10.10
• 1


4.2 修改默认路径


默认安装的配置就是 ~/airflow

临时修改 AIRFLOW_HOME 环境变量, 这里的 /home/airflow 可以替换成你想要的文件夹目录


export AIRFLOW_HOME=/home/airflow


4.3 修改默认数据库


找到配置文件


vi /home/airflow/airflow.cfg


修改sql配置

sql_alchemy_conn = mysql://root:name@localhost:3306/airflow
• 1


这二步可以不用


4.4 初始化数据库


# initialize the database
airflow db init


4.5 添加用户


airflow users create \
    --username admin \
    --firstname 123 \
    --lastname 123 \
    --role Admin \
    --email 123456789@qq.com


创建的用户密码为:quant


4.6 启动web服务


# start the web server, default port is 8080
airflow webserver --port 8080
• 1
• 2


4.7 启动定时任务


airflow scheduler


如果看到跳出这个界面 那恭喜你👍



4.8 编写airflow自动化代码模版


实现第一个 Data Pipeline

DAGs 用 Python 编写,文件储存在 DAG_FOLDER 里(默认在 ~/airflow/dags)。比较重要的参数:


dag_id

description

start_date

schedule_interval:定义 DAG 运行的频率。

depend_on_past:上一次运行成功了,才会运行。

default_args:所有 operators 实例化的默认参数。

Operators类型:


Action operator:执行动作,例如:BashOperator,PythonOperation,EmailOperator 等。

Transfer operator:传输数据,例如:PrestoToMysqlOperator,SftpOperator 等。

Sensor operator:等待数据到达。


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
    'owner': 'Déborah Mesquita',
    'start_date': days_ago(1)
}
# Defining the DAG using Context Manager
with DAG(
        'extract-meeting-activities',
        default_args=default_args,
        schedule_interval=None,
        ) as dag:
        t1 = BashOperator(
                task_id = 'Data preprocessing',
                bash_command = 'Data_preprocessing.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 = BashOperator(
                task_id = 'model structing',
                bash_command = 'model_struct.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 >> t2  # Defining the task dependencies
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
11月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
【10月更文挑战第6天】如何使用机器学习模型来自动化评估数据质量?
|
6月前
|
机器学习/深度学习 人工智能 运维
机器学习+自动化运维:让服务器自己修Bug,运维变轻松!
机器学习+自动化运维:让服务器自己修Bug,运维变轻松!
273 14
|
10月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
如何使用机器学习模型来自动化评估数据质量?
|
10月前
|
机器学习/深度学习 数据采集 运维
智能化运维:机器学习在故障预测和自动化响应中的应用
智能化运维:机器学习在故障预测和自动化响应中的应用
172 4
|
10月前
|
机器学习/深度学习
自动化机器学习研究MLR-Copilot:利用大型语言模型进行研究加速
【10月更文挑战第21天】在科技快速发展的背景下,机器学习研究面临诸多挑战。为提高研究效率,研究人员提出了MLR-Copilot系统框架,利用大型语言模型(LLM)自动生成和实施研究想法。该框架分为研究想法生成、实验实施和实施执行三个阶段,通过自动化流程显著提升研究生产力。实验结果显示,MLR-Copilot能够生成高质量的假设和实验计划,并显著提高任务性能。然而,该系统仍需大量计算资源和人类监督。
148 4
|
10月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
如何使用机器学习模型来自动化评估数据质量?
|
10月前
|
机器学习/深度学习 数据采集 人工智能
自动化测试的未来:AI与机器学习的融合之路
【10月更文挑战第41天】随着技术的快速发展,软件测试领域正经历一场由人工智能和机器学习驱动的革命。本文将探讨这一趋势如何改变测试流程、提高测试效率以及未来可能带来的挑战和机遇。我们将通过具体案例分析,揭示AI和ML在自动化测试中的应用现状及其潜力。
199 0
|
4月前
|
机器学习/深度学习 数据采集 人工智能
20分钟掌握机器学习算法指南
在短短20分钟内,从零开始理解主流机器学习算法的工作原理,掌握算法选择策略,并建立对神经网络的直观认识。本文用通俗易懂的语言和生动的比喻,帮助你告别算法选择的困惑,轻松踏入AI的大门。
|
10月前
|
机器学习/深度学习 算法 数据挖掘
K-means聚类算法是机器学习中常用的一种聚类方法,通过将数据集划分为K个簇来简化数据结构
K-means聚类算法是机器学习中常用的一种聚类方法,通过将数据集划分为K个簇来简化数据结构。本文介绍了K-means算法的基本原理,包括初始化、数据点分配与簇中心更新等步骤,以及如何在Python中实现该算法,最后讨论了其优缺点及应用场景。
1026 6
|
5月前
|
机器学习/深度学习 存储 Kubernetes
【重磅发布】AllData数据中台核心功能:机器学习算法平台
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。

热门文章

最新文章