airflow使用指南-机器学习工程自动化

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: airflow使用指南-机器学习工程自动化

1.airflow简介


Apache Airflow是一个开源工作流管理平台。它可以帮助您实现数据管道和ML管道的自动化,并在行业中广泛应用。您可以进行摄入、执行ETL、执行ML任务,并将日常工作自动化。


大家都知道,很多机器学习的算法,是需要不断的迭代更新参数的,不是一次性实现的,在面对许多这样的机器学习工程时,我们总不能每个工程都要定时去运行,调度,airflow这个平台,就是将我们所有的机器学习工程整合起来,成为不同的工程流水线,统一调度,配置


Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。


1.1 DAG


有向无环图指的是一个无回路的有向图。如果有一个非有向无环图,且A点出发向B经C可回到A,形成一个环。将从C到A的边方向改为从A到C,则变成有向无环图。有向无环图的生成树个数等于入度非零的节点的入度积。

160812939440491aadf3aac861ade0ca.png

2. airflow架构


先用一张图看一个airlflow的工作流程图:



从图中看,我们可以写很多的机器学习代码,分别为worker1、worker…。这些就是airflow平台需要执行的任务进程。


Executor:执行器,用来执行这些机器学习进程的组件

Scheduler:调度器,用来定义这些机器学习进程,执行的先后顺序,比如机器学习先做数据预处理,再特征处理

Web Server:就是一个前端服务器,最后执行完这样一个airflow进程,我们所有的机器学习工程都可以在airflow平台的WebUI上进行操作,监控进程,触发进程等。

meta data:保存我们执行进程的一些信息


这里有个地方需要知道,进程就是我们一个机器学习任务中不同的步骤,多个进程结合起来成为一个完整的机器学习任务,在airflow架构中,需要对进程进行排序前后处理,对任务也要进行前后处理。


进程的排序是由Scheduler完成,而任务是由Dags完成。


3.airflow核心模块


3.1 模块


  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。

Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。

Tasks:Task 是Operator的一个实例,也就是DAGs中的一个node。 Task Instance:task的一次运行。Web界面中可以看到task instance 有自己的状态,包括"running", “success”,“failed”,“skipped”, "up for retry"等。

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >>Task2,表明Task2依赖于Task2了。 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的工作流(workflow)


3.2 Operators模块


DAG 定义一个作业流,Operators 则定义了实际需要执行的作业。airflow 提供了许多 - Operators 来指定我们需要执行的作业:


BashOperator - 执行 bash 命令或脚本。

SSHOperator - 执行远程 bash 命令或脚本(原理同paramiko 模块)。

PythonOperator - 执行 Python 函数。

EmailOperator - 发送Email。

HTTPOperator - 发送一个 HTTP 请求。

MySqlOperator, SqliteOperator,PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行SQL 任务。

DockerOperator, HiveOperator, S3FileTransferOperator,

PrestoToMysqlOperator, SlackOperator 你懂得。除了以上这些 Operators 还可以方便的自定义

Operators 满足个性化的任务需求。


我们最常用的就是BashOperator。像这样导入模块


from airflow import DAG
from airflow.operators.bash_operator import BashOperator


4.airflow安装与使用


😎最好在虚拟环境下安装哈


4.1 安装airflow


pip install apache-airflow==1.10.10
• 1


4.2 修改默认路径


默认安装的配置就是 ~/airflow

临时修改 AIRFLOW_HOME 环境变量, 这里的 /home/airflow 可以替换成你想要的文件夹目录


export AIRFLOW_HOME=/home/airflow


4.3 修改默认数据库


找到配置文件


vi /home/airflow/airflow.cfg


修改sql配置

sql_alchemy_conn = mysql://root:name@localhost:3306/airflow
• 1


这二步可以不用


4.4 初始化数据库


# initialize the database
airflow db init


4.5 添加用户


airflow users create \
    --username admin \
    --firstname 123 \
    --lastname 123 \
    --role Admin \
    --email 123456789@qq.com


创建的用户密码为:quant


4.6 启动web服务


# start the web server, default port is 8080
airflow webserver --port 8080
• 1
• 2


4.7 启动定时任务


airflow scheduler


如果看到跳出这个界面 那恭喜你👍



4.8 编写airflow自动化代码模版


实现第一个 Data Pipeline

DAGs 用 Python 编写,文件储存在 DAG_FOLDER 里(默认在 ~/airflow/dags)。比较重要的参数:


dag_id

description

start_date

schedule_interval:定义 DAG 运行的频率。

depend_on_past:上一次运行成功了,才会运行。

default_args:所有 operators 实例化的默认参数。

Operators类型:


Action operator:执行动作,例如:BashOperator,PythonOperation,EmailOperator 等。

Transfer operator:传输数据,例如:PrestoToMysqlOperator,SftpOperator 等。

Sensor operator:等待数据到达。


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
    'owner': 'Déborah Mesquita',
    'start_date': days_ago(1)
}
# Defining the DAG using Context Manager
with DAG(
        'extract-meeting-activities',
        default_args=default_args,
        schedule_interval=None,
        ) as dag:
        t1 = BashOperator(
                task_id = 'Data preprocessing',
                bash_command = 'Data_preprocessing.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 = BashOperator(
                task_id = 'model structing',
                bash_command = 'model_struct.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )
        t1 >> t2  # Defining the task dependencies
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
机器学习/深度学习 数据采集 监控
如何使用机器学习模型来自动化评估数据质量?
如何使用机器学习模型来自动化评估数据质量?
|
12天前
|
机器学习/深度学习 人工智能 测试技术
自动化测试的未来:AI与机器学习的融合之路
【9月更文挑战第15天】在软件测试领域,自动化一直被视为提高效率和精确度的关键。随着人工智能(AI)和机器学习(ML)技术的不断进步,它们已经开始改变自动化测试的面貌。本文将探讨AI和ML如何赋能自动化测试,提升测试用例的智能生成、优化测试流程,并预测未来趋势。我们将通过实际代码示例来揭示这些技术如何被集成到现有的测试框架中,以及开发人员如何利用它们来提高软件质量。
48 15
|
30天前
|
机器学习/深度学习 人工智能 运维
自动化测试的未来:AI与机器学习的融合
【8月更文挑战第29天】随着技术的快速发展,自动化测试正在经历一场革命。本文将探讨AI和机器学习如何改变软件测试领域,提供代码示例,并讨论未来趋势。
|
1月前
|
数据采集 机器学习/深度学习 算法
"揭秘数据质量自动化的秘密武器:机器学习模型如何精准捕捉数据中的‘隐形陷阱’,让你的数据分析无懈可击?"
【8月更文挑战第20天】随着大数据成为核心资源,数据质量直接影响机器学习模型的准确性和效果。传统的人工审查方法效率低且易错。本文介绍如何运用机器学习自动化评估数据质量,解决缺失值、异常值等问题,提升模型训练效率和预测准确性。通过Python和scikit-learn示例展示了异常值检测的过程,最后强调在自动化评估的同时结合人工审查的重要性。
55 2
|
1月前
|
机器学习/深度学习 自然语言处理 算法
利用机器学习算法进行自动化测试
利用机器学习算法进行自动化测试
|
1月前
|
机器学习/深度学习 人工智能 算法
探索自动化测试的未来:AI与机器学习的融合
在软件测试领域,自动化一直是提高效率和准确性的关键。随着人工智能(AI)和机器学习(ML)技术的飞速发展,它们正在逐步改变自动化测试的面貌。本文将探讨AI和ML如何增强自动化测试的能力,提高其智能性、预测性和适应性,并分析这些技术为测试实践带来的潜在变化和挑战。
|
1月前
|
机器学习/深度学习 数据采集 测试技术
利用Python实现简单的机器学习模型软件测试的艺术与科学:探索自动化测试框架的奥秘
【8月更文挑战第27天】在本文中,我们将一起探索如何通过Python编程语言创建一个简单的机器学习模型。我们将使用scikit-learn库中的线性回归模型作为示例,并通过一个实际的数据集来训练我们的模型。文章将详细解释每一步的过程,包括数据预处理、模型训练和预测结果的评估。最后,我们会用代码块展示整个过程,确保读者能够跟随步骤实践并理解每个阶段的重要性。
|
4月前
|
机器学习/深度学习 存储 搜索推荐
利用机器学习算法改善电商推荐系统的效率
电商行业日益竞争激烈,提升用户体验成为关键。本文将探讨如何利用机器学习算法优化电商推荐系统,通过分析用户行为数据和商品信息,实现个性化推荐,从而提高推荐效率和准确性。
209 14
|
4月前
|
机器学习/深度学习 算法 数据可视化
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
95 1
|
4月前
|
机器学习/深度学习 算法 搜索推荐
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)