助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】

知识点05:AirFlow的架构组件

  • 目标:了解AirFlow的架构组件
  • 路径
  • step1:架构
  • step2:组件
  • 实施
  • 架构
  • Client:开发AirFlow调度的程序的客户端,用于开发AirFlow的Python程序
  • Master:分布式架构中的主节点,负责运行WebServer和Scheduler
  • Worker:负责运行Execution执行提交的工作流中的Task
  • 组件
A scheduler, which handles both triggering scheduled workflows, and submitting Tasks to the executor to run.
An executor, which handles running tasks. In the default Airflow installation, this runs everything inside the scheduler, but most production-suitable executors actually push task execution out to workers.
A webserver, which presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.
A folder of DAG files, read by the scheduler and executor (and any workers the executor has)
A metadata database, used by the scheduler, executor and webserver to store state.
  • WebServer:提供交互界面和监控,让开发者调试和监控所有Task的运行
  • Scheduler:负责解析和调度Task任务提交到Execution中运行
  • Executor:执行组件,负责运行Scheduler分配的Task,运行在Worker中
  • DAG Directory:DAG程序的目录,将自己开发的程序放入这个目录,AirFlow的WebServer和Scheduler会自动读取
  • airflow将所有程序放在一个目录中
  • 自动检测这个目录有么有新的程序
  • MetaData DataBase:AirFlow的元数据存储数据库,记录所有DAG程序的信息
  • 小结
  • 了解AirFlow的架构组件

知识点06:AirFlow的开发规则

  • 目标掌握AirFlow的开发规则
  • 路径
  • step1:开发Python调度程序
  • step2:提交Python调度程序
  • 实施
  • 官方文档
  • 示例:http://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
  • 开发Python调度程序
  • 开发一个Python程序,程序文件中需要包含以下几个部分
  • 注意:该文件的运行不支持utf8编码,不能写中文
  • step1:导包
# 必选:导入airflow的DAG工作流
from airflow import DAG
# 必选:导入具体的TaskOperator类型
from airflow.operators.bash import BashOperator
# 可选:导入定时工具的包
from airflow.utils.dates import days_ago
  • step2:定义DAG及配置
# 当前工作流的基础配置
default_args = {
    # 当前工作流的所有者
    'owner': 'airflow',
    # 当前工作流的邮件接受者邮箱
    'email': ['airflow@example.com'],
    # 工作流失败是否发送邮件告警
    'email_on_failure': True,
    # 工作流重试是否发送邮件告警
    'email_on_retry': True,
    # 重试次数
    'retries': 2,
    # 重试间隔时间
    'retry_delay': timedelta(minutes=1),
}
# 定义当前工作流的DAG对象
dagName = DAG(
    # 当前工作流的名称,唯一id
    'airflow_name',
    # 使用的参数配置
    default_args=default_args,
    # 当前工作流的描述
    description='first airflow task DAG',
    # 当前工作流的调度周期:定时调度【可选】
    schedule_interval=timedelta(days=1),
    # 工作流开始调度的时间
    start_date=days_ago(1),
    # 当前工作流属于哪个组
    tags=['itcast_bash'],
)
  • 构建一个DAG工作流的实例和配置
  • step3:定义Tasks
  • 执行Linux命令
  • 执行Python代码
  • 发送邮件的
  • 其他
  • BashOperator:定义一个Shell命令的Task
# 导入BashOperator
from airflow.operators.bash import BashOperator
# 定义一个Task的对象
t1 = BashOperator(
  # 指定唯一的Task的名称
    task_id='first_bashoperator_task',
  # 指定具体要执行的Linux命令
    bash_command='echo "hello airflow"',
  # 指定属于哪个DAG对象
    dag=dagName
)
  • PythonOperator:定义一个Python代码的Task
# 导入PythonOperator
from airflow.operators.python import PythonOperator
# 定义需要执行的代码逻辑
def sayHello():
    print("this is a programe")
#定义一个Task对象
t2 = PythonOperator(
    # 指定唯一的Task的名称
    task_id='first_pyoperator_task',
    # 指定调用哪个Python函数
    python_callable=sayHello,
    # 指定属于哪个DAG对象
    dag=dagName
)
  • step4:运行Task并指定依赖关系
  • 定义Task
Task1:runme_0
Task2:runme_1
Task3:runme_2
Task4:run_after_loop
Task5:also_run_this
Task6:this_will_skip
Task7:run_this_last
  • 需求
  • Task1、Task2、Task3并行运行,结束以后运行Task4
  • Task4、Task5、Task6并行运行,结束以后运行Task7

  • 代码
task1 >> task4
task2 >> task4
task3 >> task4
task4 >> task7
task5 >> task7
task6 >> task7
  • 如果只有一个Task,只要直接写上Task对象名称即可
task1
  • 提交Python调度程序
  • 哪种提交都需要等待一段时间
  • 自动提交:需要等待自动检测
  • 将开发好的程序放入AirFlow的DAG Directory目录中
  • 默认路径为:/root/airflow/dags
  • 手动提交:手动运行文件让airflow监听加载
python xxxx.py
  • 调度状态
  • No status (scheduler created empty task instance):调度任务已创建,还未产生任务实例
  • Scheduled (scheduler determined task instance needs to run):调度任务已生成任务实例,待运行
  • Queued (scheduler sent task to executor to run on the queue):调度任务开始在executor执行前,在队列中
  • Running (worker picked up a task and is now running it):任务在worker节点上执行中
  • Success (task completed):任务执行成功完成
  • 小结
  • 掌握AirFlow的开发规则


相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
大数据
【赵渝强老师】大数据主从架构的单点故障
大数据体系架构中,核心组件采用主从架构,存在单点故障问题。为提高系统可用性,需实现高可用(HA)架构,通常借助ZooKeeper来实现。ZooKeeper提供配置维护、分布式同步等功能,确保集群稳定运行。下图展示了基于ZooKeeper的HDFS HA架构。
|
3月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
220 1
|
22天前
|
存储 SQL 分布式计算
大数据时代的引擎:大数据架构随记
大数据架构通常分为四层:数据采集层、数据存储层、数据计算层和数据应用层。数据采集层负责从各种源采集、清洗和转换数据,常用技术包括Flume、Sqoop和Logstash+Filebeat。数据存储层管理数据的持久性和组织,常用技术有Hadoop HDFS、HBase和Elasticsearch。数据计算层处理大规模数据集,支持离线和在线计算,如Spark SQL、Flink等。数据应用层将结果可视化或提供给第三方应用,常用工具为Tableau、Zeppelin和Superset。
235 8
|
2月前
|
监控 前端开发 数据可视化
3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目,实现数字孪生
@icraft/player-react 是 iCraft Editor 推出的 React 组件库,旨在简化3D数字孪生场景的前端集成。它支持零配置快速接入、自定义插件、丰富的事件和方法、动画控制及实时数据接入,帮助开发者轻松实现3D场景与React项目的无缝融合。
210 8
3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目,实现数字孪生
|
2月前
|
SQL 数据采集 分布式计算
【赵渝强老师】基于大数据组件的平台架构
本文介绍了大数据平台的总体架构及各层的功能。大数据平台架构分为五层:数据源层、数据采集层、大数据平台层、数据仓库层和应用层。其中,大数据平台层为核心,负责数据的存储和计算,支持离线和实时数据处理。数据仓库层则基于大数据平台构建数据模型,应用层则利用这些模型实现具体的应用场景。文中还提供了Lambda和Kappa架构的视频讲解。
274 3
【赵渝强老师】基于大数据组件的平台架构
|
22天前
|
存储 负载均衡 监控
揭秘 Elasticsearch 集群架构,解锁大数据处理神器
Elasticsearch 是一个强大的分布式搜索和分析引擎,广泛应用于大数据处理、实时搜索和分析。本文深入探讨了 Elasticsearch 集群的架构和特性,包括高可用性和负载均衡,以及主节点、数据节点、协调节点和 Ingest 节点的角色和功能。
46 0
|
2月前
|
存储 人工智能 大数据
物联网、大数据、云计算、人工智能之间的关系
物联网、大数据、云计算、人工智能之间的关系是紧密相连、相互促进的。这四者既有各自独立的技术特征,又能在不同层面上相互融合,共同推动信息技术的发展和应用。
770 0
|
3月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
212 1
|
3月前
|
消息中间件 运维 NoSQL
基础架构组件选型及服务化
【10月更文挑战第15天】本文概述了分布式系统中常见的基础架构组件及其选型与服务化的重要性。