助力工业物联网,工业大数据之服务域:定时调度使用【三十四】

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 助力工业物联网,工业大数据之服务域:定时调度使用【三十四】

12:定时调度使用

  • 目标掌握定时调度的使用方式
  • 实施
  • http://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

  • 方式一:内置
with DAG(
    dag_id='example_branch_operator',
    default_args=args,
    start_date=days_ago(2),
    schedule_interval="@daily",
    tags=['example', 'example2'],
) as dag:
  • 方式二:datetime.timedelta对象
timedelta(minutes=1)
timedelta(hours=3)
timedelta(days=1)
with DAG(
    dag_id='latest_only',
    schedule_interval=dt.timedelta(hours=4),
    start_date=days_ago(2),
    tags=['example2', 'example3'],
) as dag:
  • 方式三:Crontab表达式
  • 与Linux Crontab用法一致
with DAG(
    dag_id='example_branch_dop_operator_v3',
    schedule_interval='*/1 * * * *',
    start_date=days_ago(2),
    default_args=args,
    tags=['example'],
) as dag:
分钟    小时    日     月     周
00     00     *     *     *
05    12      1     *     *
30    8     *     *     4
  • 小结
  • 掌握定时调度的使用方式

13:Airflow常用命令

  • 目标:了解AirFlow的常用命令
  • 实施
  • 列举当前所有的dag
airflow dags list
  • 暂停某个DAG
airflow dags pause dag_name
  • 启动某个DAG
airflow dags unpause dag_name
  • 删除某个DAG
airflow dags delete dag_name
  • 执行某个DAG
airflow dags  trigger dag_name
  • 查看某个DAG的状态
airflow dags  state dag_name
  • 列举某个DAG的所有Task
airflow tasks list dag_name
  • 小结
  • 了解AirFlow的常用命令

14:邮件告警使用

  • 目标:了解AirFlow中如何实现邮件告警
  • 路径
  • step1:AirFlow配置
  • step2:DAG配置
  • 实施
  • 原理:自动发送邮件的原理:邮件第三方服务
  • 发送方账号:配置文件中配置
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com
  • 接收方账号:程序中配置
default_args = {
    'owner': 'airflow',
    'email': ['jiangzonghai@itcast.cn'],
  'email_on_failure': True,
    'email_on_retry': True,
  'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
  • AirFlow配置:airflow.cfg
# 发送邮件的代理服务器地址及认证:每个公司都不一样
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# 发送邮件的账号
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com
# 超时时间
smtp_timeout = 30
# 重试次数
smtp_retry_limit = 5
  • 关闭Airflow
# 统一杀掉airflow的相关服务进程命令
ps -ef|egrep 'scheduler|flower|worker|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
# 下一次启动之前
rm -f /root/airflow/airflow-*
  • 程序配置
default_args = {
    'email': ['jiangzonghai@itcast.cn'],
    'email_on_failure': True,
    'email_on_retry': True
}
  • 启动Airflow
airflow webserver -D
airflow scheduler -D
airflow celery flower -D
airflow celery worker -D
  • 模拟错误

  • 小结
  • 了解AirFlow中如何实现邮件告警

15:一站制造中的调度

  • 目标:了解一站制造中调度的实现
  • 实施
  • ODS层 / DWD层:定时调度:每天00:05开始运行
  • dws(11)
  • dws耗时1小时
  • 从凌晨1点30分开始执行
  • dwb(16)
  • dwb耗时1.5小时
  • 从凌晨3点开始执行
  • st(10)
  • st耗时1小时
  • 从凌晨4点30分开始执行
  • dm(1)
  • dm耗时0.5小时
  • 从凌晨5点30分开始执行
  • 小结
  • 了解一站制造中调度的实现

16:回顾:Spark核心概念

  • 什么是分布式计算?
  • 分布式程序:MapReduce、Spark、Flink程序
  • 多进程:一个程序由多个进程来共同实现,不同进程可以运行在不同机器上
  • 每个进程所负责计算的数据是不一样,都是整体数据的某一个部分
  • 自己基于MapReduce或者Spark的API开发的程序:数据处理的逻辑
  • 分逻辑
  • MR
  • ·MapTask进程:分片规则:基于处理的数据做计算
  • 判断:文件大小 / 128M > 1.1
  • 大于:按照每128M分
  • 小于:整体作为1个分片
  • 大文件:每128M作为一个分片
  • 一个分片就对应一个MapTask
  • ReduceTask进程:指定
  • Spark
  • Executor:指定
  • 分布式资源:YARN、Standalone资源容器
  • 将多台机器的物理资源:CPU、内存、磁盘从逻辑上合并为一个整体
  • YARN:ResourceManager、NodeManager【8core8GB】
  • 每个NM管理每台机器的资源
  • RM管理所有的NM
  • Standalone:Master、Worker
  • 实现统一的硬件资源管理:MR、Flink、Spark on YARN
  • Spark程序的组成结构?
  • Application:程序
  • 进程:一个Driver、多个Executor
  • 运行:多个Job、多个Stage、多个Task
  • 什么是Standalone?
  • Spark自带的集群资源管理平台
  • 为什么要用Spark on YARN?
  • 为了实现资源统一化的管理,将所有程序都提交到YARN运行
  • Master和Worker是什么?
  • 分布式主从架构:Hadoop、Hbase、Kafka、Spark……
  • 主:管理节点:Master
  • 接客
  • 管理从节点
  • 管理所有资源
  • 从:计算节点:Worker
  • 负责执行主节点分配的任务
  • Driver和Executer是什么?
  • step1:启动了分布式资源平台
  • step2:开发一个分布式计算程序
sc = SparkContext(conf)
# step1:读取数据
inputRdd = sc.textFile(hdfs_path)
#step2:转换数据
wcRdd = inputRdd.filter.map.flatMap.reduceByKey
#step3:保存结果
wcRdd.foreach
sc.stop
  • step3:提交分布式程序到分布式资源集群运行
spark-submit xxx.py
executor个数和资源
driver资源配置
  • 先启动Driver进程
  • 申请资源:启动Executor计算进程
  • Driver开始解析代码,判断每一句代码是否产生job
  • 再启动Executor进程:根据资源配置运行在Worker节点上
  • 所有Executor向Driver反向注册,等待Driver分配Task
  • Job是怎么产生的?
  • 当用到RDD中的数据时候就会触发Job的产生:所有会用到RDD数据的函数称为触发算子
  • DAGScheduler组件根据代码为当前的job构建DAG图
  • DAG是怎么生成的?
  • 算法:回溯算法:倒推
  • DAG构建过程中,将每个算子放入Stage中,如果遇到宽依赖的算子,就构建一个新的Stage
  • Stage划分:宽依赖
  • 运行Stage:按照Stage编号小的开始运行
  • 将每个Stage转换为一个TaskSet:Task集合
  • Task的个数怎么决定?
  • 一核CPU = 一个Task = 一个分区
  • 一个Stage转换成的TaskSet中有几个Task:由Stage中RDD的最大分区数来决定
  • Spark的算子分为几类?
  • 转换:Transformation
  • 返回值:RDD
  • 为lazy模式,不会触发job的产生
  • map、flatMap
  • 触发:Action
  • 返回值:非RDD
  • 触发job的产生
  • count、first


相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
1月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
63 5
|
2月前
|
监控 供应链 安全
物联网卡在工业领域的应用
物联网卡在工业领域的应用极大地推动了行业的智能化、自动化和高效化进程。以下是物联网卡在工业领域中各操作类型中的具体应用作用:
|
1月前
|
存储 数据采集 监控
大数据技术:开启智能决策与创新服务的新纪元
【10月更文挑战第5天】大数据技术:开启智能决策与创新服务的新纪元
|
3月前
|
DataWorks Kubernetes 大数据
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
|
17天前
|
存储 人工智能 大数据
物联网、大数据、云计算、人工智能之间的关系
物联网、大数据、云计算、人工智能之间的关系是紧密相连、相互促进的。这四者既有各自独立的技术特征,又能在不同层面上相互融合,共同推动信息技术的发展和应用。
127 0
|
1月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
76 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
20天前
|
机器学习/深度学习 人工智能 物联网
深度学习:物联网大数据洞察中的人工智能
深度学习:物联网大数据洞察中的人工智能
|
1月前
|
安全 物联网 数据挖掘
选择物联网卡如何筛选服务提供商呢
在选择物联网卡服务提供商时,您需要考虑多个因素以确保所选服务商能够满足您的业务需求,同时提供稳定可靠的服务。以下是一些关键步骤和建议,帮助您筛选物联网卡服务提供商:
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
53 3
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
84 3