助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】

知识点07:Shell调度测试

  • 目标实现Shell命令的调度测试
  • 实施
  • 需求:使用BashOperator调度执行一条Linux命令
  • 代码
  • 创建
# 默认的Airflow自动检测工作流程序的文件的目录
mkdir -p /root/airflow/dags
cd /root/airflow/dags
vim first_bash_operator.py
  • 开发
# import
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# define args
default_args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
# define dag
dag = DAG(
    'first_airflow_dag',
    default_args=default_args,
    description='first airflow task DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['itcast_bash'],
)
# define task1
run_bash_task = BashOperator(
    task_id='first_bashoperator_task',
    bash_command='echo "hello airflow"',
    dag=dag,
)
# run the task
run_bash_task
  • 工作中使用bashOperator
bash_command='sh xxxx.sh'
  • xxxx.sh:根据需求
  • Linux命令
  • hive -f
  • spark-sql -f
  • spark-submit python | jar
  • 提交
python first_bash_operator.py 
  • 查看

  • 执行

  • 小结
  • 实现Shell命令的调度测试

知识点08:依赖调度测试

  • 目标:实现AirFlow的依赖调度测试
  • 实施
  • 需求:使用BashOperator调度执行多个Task,并构建依赖关系
  • 代码
  • 创建
cd /root/airflow/dags
vim second_bash_operator.py
  • 开发
# import
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# define args
default_args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
# define dag
dag = DAG(
    'second_airflow_dag',
    default_args=default_args,
    description='first airflow task DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['itcast_bash'],
)
# define task1
say_hello_task = BashOperator(
    task_id='say_hello_task',
    bash_command='echo "start task"',
    dag=dag,
)
# define task2
print_date_format_task2 = BashOperator(
    task_id='print_date_format_task2',
    bash_command='date +"%F %T"',
    dag=dag,
)
# define task3
print_date_format_task3 = BashOperator(
    task_id='print_date_format_task3',
    bash_command='date +"%F %T"',
    dag=dag,
)
# define task4
end_task4 = BashOperator(
    task_id='end_task',
    bash_command='echo "end task"',
    dag=dag,
)
say_hello_task >> [print_date_format_task2,print_date_format_task3] >> end_task4
  • 提交
python second_bash_operator.py 
  • 查看

  • 小结
  • 实现AirFlow的依赖调度测试

知识点09:Python调度测试

  • 目标实现Python代码的调度测试
  • 实施
  • 需求:调度Python代码Task的运行
  • 代码
  • 创建
cd /root/airflow/dags
vim python_etl_airflow.py
  • 开发
# import package
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json
# define args
default_args = {
    'owner': 'airflow',
}
# define the dag
with DAG(
    'python_etl_dag',
    default_args=default_args,
    description='DATA ETL DAG',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['itcast'],
) as dag:
    # function1
    def extract(**kwargs):
        ti = kwargs['ti']
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22, "1004": 606.65, "1005": 777.03}'
        ti.xcom_push('order_data', data_string)
    # function2
    def transform(**kwargs):
        ti = kwargs['ti']
        extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
        order_data = json.loads(extract_data_string)
        total_order_value = 0
        for value in order_data.values():
            total_order_value += value
        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push('total_order_value', total_value_json_string)
    # function3
    def load(**kwargs):
        ti = kwargs['ti']
        total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
        total_order_value = json.loads(total_value_string)
        print(total_order_value)
    # task1
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
    )
    extract_task.doc_md = """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
  # task2
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
    )
    transform_task.doc_md = """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
  # task3
    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
    )
    load_task.doc_md = """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
# run
extract_task >> transform_task >> load_task
  • 提交
python python_etl_airflow.py
  • 查看

  • 小结
  • 实现Python代码的调度测试

知识点10:Oracle与MySQL调度方法

  • 目标:了解Oracle与MySQL的调度方法
  • 实施
  • Oracle调度:参考《oracle任务调度详细操作文档.md》
  • step1:本地安装Oracle客户端
  • step2:安装AirFlow集成Oracle库
  • step3:创建Oracle连接
  • step4:开发测试
query_oracle_task = OracleOperator(
    task_id = 'oracle_operator_task',
    sql = 'select * from ciss4.ciss_base_areas',
    oracle_conn_id = 'oracle-airflow-connection',
    autocommit = True,
    dag=dag
)
  • MySQL调度:《MySQL任务调度详细操作文档.md》
  • step1:本地安装MySQL客户端
  • step2:安装AirFlow集成MySQL库
  • step3:创建MySQL连接
  • step4:开发测试
  • 方式一:指定SQL语句
query_table_mysql_task = MySqlOperator(
    task_id='query_table_mysql', 
    mysql_conn_id='mysql_airflow_connection', 
    sql=r"""select * from test.test_airflow_mysql_task;""",
    dag=dag
)
  • 方式二:指定SQL文件
query_table_mysql_task = MySqlOperator(
    task_id='query_table_mysql_second', 
    mysql_conn_id='mysql-airflow-connection', 
    sql='test_airflow_mysql_task.sql',
    dag=dag
)
  • 方式三:指定变量
insert_sql = r"""
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task3');
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task4');
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task5');
"""
insert_table_mysql_task = MySqlOperator(
    task_id='mysql_operator_insert_task', 
    mysql_conn_id='mysql-airflow-connection', 
    sql=insert_sql,
    dag=dag
)
  • 小结
  • 了解Oracle与MySQL的调度方法

知识点11:大数据组件调度方法

  • 目标:了解大数据组件调度方法
  • 实施
  • AirFlow支持的类型
  • HiveOperator
  • PrestoOperator
  • SparkSqlOperator
  • 需求:Sqoop、MR、Hive、Spark、Flink
  • 解决:统一使用BashOperator或者PythonOperator,将对应程序封装在脚本中
  • Sqoop
run_sqoop_task = BashOperator(
    task_id='sqoop_task',
    bash_command='sqoop --options-file xxxx.sqoop',
    dag=dag,
)
  • Hive
run_hive_task = BashOperator(
    task_id='hive_task',
    bash_command='hive -f xxxx.sql',
    dag=dag,
)
  • Spark
run_spark_task = BashOperator(
    task_id='spark_task',
    bash_command='spark-sql -f xxxx.sql',
    dag=dag,
)
  • Flink
run_flink_task = BashOperator(
    task_id='flink_task',
    bash_command='flink run /opt/flink-1.12.2/examples/batch/WordCount.jar',
    dag=dag,
)
  • 小结
  • 了解大数据组件调度方法


相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
安全 Linux Shell
Linux SSH(Secure Shell)服务
Linux SSH提供安全网络协议,使用公钥加密技术确保远程服务传输安全。OpenSSH是实现SSH服务的免费开源工具,允许用户加密连接远程登录Linux服务器执行任务。SSH比Telnet更安全,防止数据被截获。SSH还支持端口转发和隧道,广泛应用于系统管理和网络维护,是安全远程访问服务器的重要工具。
147 1
|
Java 关系型数据库 MySQL
Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
【4月更文挑战第12天】Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
940 3
|
3月前
|
Linux Shell
在Linux、CentOS7中设置shell脚本开机自启动服务
以上就是在CentOS 7中设置shell脚本开机自启动服务的全部步骤。希望这个指南能帮助你更好地管理你的Linux系统。
184 25
|
5月前
|
传感器 物联网 大数据
物联网与大数据:揭秘万物互联的新纪元
物联网与大数据:揭秘万物互联的新纪元
188 7
|
8月前
|
存储 人工智能 大数据
物联网、大数据、云计算、人工智能之间的关系
物联网、大数据、云计算、人工智能之间的关系是紧密相连、相互促进的。这四者既有各自独立的技术特征,又能在不同层面上相互融合,共同推动信息技术的发展和应用。
2459 0
|
11月前
|
传感器 物联网 测试技术
未来科技浪潮中的领航者:区块链、物联网与虚拟现实的融合与创新探索自动化测试之美——以Selenium为例
【8月更文挑战第30天】本文深入探讨了当前最前沿的技术趋势——区块链、物联网和虚拟现实,并分析了它们各自的发展脉络及相互之间的融合可能性。我们将通过具体应用场景描绘这些技术如何塑造未来社会的面貌,同时提供代码示例以加深理解。文章旨在为读者揭示这些技术背后的巨大潜力,以及它们将如何影响我们的工作和生活方式。
|
11月前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
11月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
11月前
|
Ubuntu Linux Shell
在Linux中,如何使用shell脚本判断某个服务是否正在运行?
在Linux中,如何使用shell脚本判断某个服务是否正在运行?
|
12月前
|
分布式计算 大数据 Shell
MaxCompute产品使用合集之odps shell如何将ech变量的结果集合写入文件,并且指定服务器的位置
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
100 10

热门文章

最新文章