助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】

简介: 助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】

知识点05:AirFlow的架构组件

  • 目标:了解AirFlow的架构组件
  • 路径
  • step1:架构
  • step2:组件
  • 实施
  • 架构
  • Client:开发AirFlow调度的程序的客户端,用于开发AirFlow的Python程序
  • Master:分布式架构中的主节点,负责运行WebServer和Scheduler
  • Worker:负责运行Execution执行提交的工作流中的Task
  • 组件
A scheduler, which handles both triggering scheduled workflows, and submitting Tasks to the executor to run.
An executor, which handles running tasks. In the default Airflow installation, this runs everything inside the scheduler, but most production-suitable executors actually push task execution out to workers.
A webserver, which presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.
A folder of DAG files, read by the scheduler and executor (and any workers the executor has)
A metadata database, used by the scheduler, executor and webserver to store state.
  • WebServer:提供交互界面和监控,让开发者调试和监控所有Task的运行
  • Scheduler:负责解析和调度Task任务提交到Execution中运行
  • Executor:执行组件,负责运行Scheduler分配的Task,运行在Worker中
  • DAG Directory:DAG程序的目录,将自己开发的程序放入这个目录,AirFlow的WebServer和Scheduler会自动读取
  • airflow将所有程序放在一个目录中
  • 自动检测这个目录有么有新的程序
  • MetaData DataBase:AirFlow的元数据存储数据库,记录所有DAG程序的信息
  • 小结
  • 了解AirFlow的架构组件

知识点06:AirFlow的开发规则

  • 目标掌握AirFlow的开发规则
  • 路径
  • step1:开发Python调度程序
  • step2:提交Python调度程序
  • 实施
  • 官方文档
  • 示例:http://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
  • 开发Python调度程序
  • 开发一个Python程序,程序文件中需要包含以下几个部分
  • 注意:该文件的运行不支持utf8编码,不能写中文
  • step1:导包
# 必选:导入airflow的DAG工作流
from airflow import DAG
# 必选:导入具体的TaskOperator类型
from airflow.operators.bash import BashOperator
# 可选:导入定时工具的包
from airflow.utils.dates import days_ago
  • step2:定义DAG及配置
# 当前工作流的基础配置
default_args = {
    # 当前工作流的所有者
    'owner': 'airflow',
    # 当前工作流的邮件接受者邮箱
    'email': ['airflow@example.com'],
    # 工作流失败是否发送邮件告警
    'email_on_failure': True,
    # 工作流重试是否发送邮件告警
    'email_on_retry': True,
    # 重试次数
    'retries': 2,
    # 重试间隔时间
    'retry_delay': timedelta(minutes=1),
}
# 定义当前工作流的DAG对象
dagName = DAG(
    # 当前工作流的名称,唯一id
    'airflow_name',
    # 使用的参数配置
    default_args=default_args,
    # 当前工作流的描述
    description='first airflow task DAG',
    # 当前工作流的调度周期:定时调度【可选】
    schedule_interval=timedelta(days=1),
    # 工作流开始调度的时间
    start_date=days_ago(1),
    # 当前工作流属于哪个组
    tags=['itcast_bash'],
)
  • 构建一个DAG工作流的实例和配置
  • step3:定义Tasks
  • 执行Linux命令
  • 执行Python代码
  • 发送邮件的
  • 其他
  • BashOperator:定义一个Shell命令的Task
# 导入BashOperator
from airflow.operators.bash import BashOperator
# 定义一个Task的对象
t1 = BashOperator(
  # 指定唯一的Task的名称
    task_id='first_bashoperator_task',
  # 指定具体要执行的Linux命令
    bash_command='echo "hello airflow"',
  # 指定属于哪个DAG对象
    dag=dagName
)
  • PythonOperator:定义一个Python代码的Task
# 导入PythonOperator
from airflow.operators.python import PythonOperator
# 定义需要执行的代码逻辑
def sayHello():
    print("this is a programe")
#定义一个Task对象
t2 = PythonOperator(
    # 指定唯一的Task的名称
    task_id='first_pyoperator_task',
    # 指定调用哪个Python函数
    python_callable=sayHello,
    # 指定属于哪个DAG对象
    dag=dagName
)
  • step4:运行Task并指定依赖关系
  • 定义Task
Task1:runme_0
Task2:runme_1
Task3:runme_2
Task4:run_after_loop
Task5:also_run_this
Task6:this_will_skip
Task7:run_this_last
  • 需求
  • Task1、Task2、Task3并行运行,结束以后运行Task4
  • Task4、Task5、Task6并行运行,结束以后运行Task7

  • 代码
task1 >> task4
task2 >> task4
task3 >> task4
task4 >> task7
task5 >> task7
task6 >> task7
  • 如果只有一个Task,只要直接写上Task对象名称即可
task1
  • 提交Python调度程序
  • 哪种提交都需要等待一段时间
  • 自动提交:需要等待自动检测
  • 将开发好的程序放入AirFlow的DAG Directory目录中
  • 默认路径为:/root/airflow/dags
  • 手动提交:手动运行文件让airflow监听加载
python xxxx.py
  • 调度状态
  • No status (scheduler created empty task instance):调度任务已创建,还未产生任务实例
  • Scheduled (scheduler determined task instance needs to run):调度任务已生成任务实例,待运行
  • Queued (scheduler sent task to executor to run on the queue):调度任务开始在executor执行前,在队列中
  • Running (worker picked up a task and is now running it):任务在worker节点上执行中
  • Success (task completed):任务执行成功完成
  • 小结
  • 掌握AirFlow的开发规则


相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
15天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
37 2
|
30天前
|
消息中间件 网络协议 物联网
MQTT常见问题之物联网设备端申请动态注册时MQTT服务不可用如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
1月前
|
存储 Kubernetes 负载均衡
Kubernetes的“厨房”:架构是菜谱,组件是厨具,资源对象是食材(下)
本文深入探讨了Kubernetes(K8s)的架构、核心组件以及资源对象。Kubernetes作为一个开源的容器编排系统,通过其独特的架构设计和丰富的组件,实现了对容器化应用程序的高效管理和扩展。通过本文的介绍,读者可以深入了解Kubernetes的架构、核心组件以及资源对象,从而更好地应用和管理容器化应用程序。Kubernetes的灵活性和可扩展性使得它成为容器编排领域的领先者,为企业提供了强大的容器运行环境。
|
2月前
|
SQL 存储 缓存
MySQL - 一文了解MySQL的基础架构及各个组件的作用
MySQL - 一文了解MySQL的基础架构及各个组件的作用
|
28天前
|
设计模式 安全 Java
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
33 0
|
29天前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(二)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
15 0
|
15天前
|
存储 数据库 Android开发
构建高效安卓应用:采用Jetpack架构组件优化用户体验
【4月更文挑战第12天】 在当今快速发展的数字时代,Android 应用程序的流畅性与响应速度对用户满意度至关重要。为提高应用性能并降低维护成本,开发者需寻求先进的技术解决方案。本文将探讨如何利用 Android Jetpack 中的架构组件 — 如 LiveData、ViewModel 和 Room — 来构建高质量的安卓应用。通过具体实施案例分析,我们将展示这些组件如何协同工作以实现数据持久化、界面与逻辑分离,以及确保数据的即时更新,从而优化用户体验并提升应用的可维护性和可测试性。
|
1月前
|
SpringCloudAlibaba Java 持续交付
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(一)基础知识+各个组件介绍+聚合父工程创建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(一)基础知识+各个组件介绍+聚合父工程创建
96 1
|
1月前
|
Kubernetes API 调度
Kubernetes的“厨房”:架构是菜谱,组件是厨具,资源对象是食材(上)
本文深入探讨了Kubernetes(K8s)的架构、核心组件以及资源对象。Kubernetes作为一个开源的容器编排系统,通过其独特的架构设计和丰富的组件,实现了对容器化应用程序的高效管理和扩展。通过本文的介绍,读者可以深入了解Kubernetes的架构、核心组件以及资源对象,从而更好地应用和管理容器化应用程序。Kubernetes的灵活性和可扩展性使得它成为容器编排领域的领先者,为企业提供了强大的容器运行环境。
|
1月前
uni-app 4.12开发弹出层组件(一)基础架构
uni-app 4.12开发弹出层组件(一)基础架构
19 0