助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,1000CU*H 3个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】

知识点07:Shell调度测试

  • 目标实现Shell命令的调度测试
  • 实施
  • 需求:使用BashOperator调度执行一条Linux命令
  • 代码
  • 创建
# 默认的Airflow自动检测工作流程序的文件的目录
mkdir -p /root/airflow/dags
cd /root/airflow/dags
vim first_bash_operator.py
  • 开发
# import
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# define args
default_args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
# define dag
dag = DAG(
    'first_airflow_dag',
    default_args=default_args,
    description='first airflow task DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['itcast_bash'],
)
# define task1
run_bash_task = BashOperator(
    task_id='first_bashoperator_task',
    bash_command='echo "hello airflow"',
    dag=dag,
)
# run the task
run_bash_task
  • 工作中使用bashOperator
bash_command='sh xxxx.sh'
  • xxxx.sh:根据需求
  • Linux命令
  • hive -f
  • spark-sql -f
  • spark-submit python | jar
  • 提交
python first_bash_operator.py 
  • 查看

  • 执行

  • 小结
  • 实现Shell命令的调度测试

知识点08:依赖调度测试

  • 目标:实现AirFlow的依赖调度测试
  • 实施
  • 需求:使用BashOperator调度执行多个Task,并构建依赖关系
  • 代码
  • 创建
cd /root/airflow/dags
vim second_bash_operator.py
  • 开发
# import
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# define args
default_args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
# define dag
dag = DAG(
    'second_airflow_dag',
    default_args=default_args,
    description='first airflow task DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['itcast_bash'],
)
# define task1
say_hello_task = BashOperator(
    task_id='say_hello_task',
    bash_command='echo "start task"',
    dag=dag,
)
# define task2
print_date_format_task2 = BashOperator(
    task_id='print_date_format_task2',
    bash_command='date +"%F %T"',
    dag=dag,
)
# define task3
print_date_format_task3 = BashOperator(
    task_id='print_date_format_task3',
    bash_command='date +"%F %T"',
    dag=dag,
)
# define task4
end_task4 = BashOperator(
    task_id='end_task',
    bash_command='echo "end task"',
    dag=dag,
)
say_hello_task >> [print_date_format_task2,print_date_format_task3] >> end_task4
  • 提交
python second_bash_operator.py 
  • 查看

  • 小结
  • 实现AirFlow的依赖调度测试

知识点09:Python调度测试

  • 目标实现Python代码的调度测试
  • 实施
  • 需求:调度Python代码Task的运行
  • 代码
  • 创建
cd /root/airflow/dags
vim python_etl_airflow.py
  • 开发
# import package
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json
# define args
default_args = {
    'owner': 'airflow',
}
# define the dag
with DAG(
    'python_etl_dag',
    default_args=default_args,
    description='DATA ETL DAG',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['itcast'],
) as dag:
    # function1
    def extract(**kwargs):
        ti = kwargs['ti']
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22, "1004": 606.65, "1005": 777.03}'
        ti.xcom_push('order_data', data_string)
    # function2
    def transform(**kwargs):
        ti = kwargs['ti']
        extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
        order_data = json.loads(extract_data_string)
        total_order_value = 0
        for value in order_data.values():
            total_order_value += value
        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push('total_order_value', total_value_json_string)
    # function3
    def load(**kwargs):
        ti = kwargs['ti']
        total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
        total_order_value = json.loads(total_value_string)
        print(total_order_value)
    # task1
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
    )
    extract_task.doc_md = """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
  # task2
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
    )
    transform_task.doc_md = """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
  # task3
    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
    )
    load_task.doc_md = """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
# run
extract_task >> transform_task >> load_task
  • 提交
python python_etl_airflow.py
  • 查看

  • 小结
  • 实现Python代码的调度测试

知识点10:Oracle与MySQL调度方法

  • 目标:了解Oracle与MySQL的调度方法
  • 实施
  • Oracle调度:参考《oracle任务调度详细操作文档.md》
  • step1:本地安装Oracle客户端
  • step2:安装AirFlow集成Oracle库
  • step3:创建Oracle连接
  • step4:开发测试
query_oracle_task = OracleOperator(
    task_id = 'oracle_operator_task',
    sql = 'select * from ciss4.ciss_base_areas',
    oracle_conn_id = 'oracle-airflow-connection',
    autocommit = True,
    dag=dag
)
  • MySQL调度:《MySQL任务调度详细操作文档.md》
  • step1:本地安装MySQL客户端
  • step2:安装AirFlow集成MySQL库
  • step3:创建MySQL连接
  • step4:开发测试
  • 方式一:指定SQL语句
query_table_mysql_task = MySqlOperator(
    task_id='query_table_mysql', 
    mysql_conn_id='mysql_airflow_connection', 
    sql=r"""select * from test.test_airflow_mysql_task;""",
    dag=dag
)
  • 方式二:指定SQL文件
query_table_mysql_task = MySqlOperator(
    task_id='query_table_mysql_second', 
    mysql_conn_id='mysql-airflow-connection', 
    sql='test_airflow_mysql_task.sql',
    dag=dag
)
  • 方式三:指定变量
insert_sql = r"""
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task3');
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task4');
INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task5');
"""
insert_table_mysql_task = MySqlOperator(
    task_id='mysql_operator_insert_task', 
    mysql_conn_id='mysql-airflow-connection', 
    sql=insert_sql,
    dag=dag
)
  • 小结
  • 了解Oracle与MySQL的调度方法

知识点11:大数据组件调度方法

  • 目标:了解大数据组件调度方法
  • 实施
  • AirFlow支持的类型
  • HiveOperator
  • PrestoOperator
  • SparkSqlOperator
  • 需求:Sqoop、MR、Hive、Spark、Flink
  • 解决:统一使用BashOperator或者PythonOperator,将对应程序封装在脚本中
  • Sqoop
run_sqoop_task = BashOperator(
    task_id='sqoop_task',
    bash_command='sqoop --options-file xxxx.sqoop',
    dag=dag,
)
  • Hive
run_hive_task = BashOperator(
    task_id='hive_task',
    bash_command='hive -f xxxx.sql',
    dag=dag,
)
  • Spark
run_spark_task = BashOperator(
    task_id='spark_task',
    bash_command='spark-sql -f xxxx.sql',
    dag=dag,
)
  • Flink
run_flink_task = BashOperator(
    task_id='flink_task',
    bash_command='flink run /opt/flink-1.12.2/examples/batch/WordCount.jar',
    dag=dag,
)
  • 小结
  • 了解大数据组件调度方法


相关实践学习
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
1月前
|
存储 数据采集 搜索推荐
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
本篇文章探讨了 Java 大数据在智慧文旅景区中的创新应用,重点分析了如何通过数据采集、情感分析与可视化等技术,挖掘游客情感需求,进而优化景区服务。文章结合实际案例,展示了 Java 在数据处理与智能推荐等方面的强大能力,为文旅行业的智慧化升级提供了可行路径。
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
|
2月前
|
分布式计算 搜索推荐 算法
Java 大视界 -- Java 大数据在智慧养老服务需求分析与个性化服务匹配中的应用(186)
本篇文章探讨了Java大数据技术在智慧养老服务需求分析与个性化服务匹配中的应用。通过整合老年人健康数据与行为数据,结合机器学习与推荐算法,实现对老年人健康风险的预测及个性化服务推荐,提升养老服务的智能化与精准化水平,助力智慧养老高质量发展。
|
2月前
|
SQL 缓存 监控
大数据之路:阿里巴巴大数据实践——实时技术与数据服务
实时技术通过流式架构实现数据的实时采集、处理与存储,支持高并发、低延迟的数据服务。架构涵盖数据分层、多流关联,结合Flink、Kafka等技术实现高效流计算。数据服务提供统一接口,支持SQL查询、数据推送与定时任务,保障数据实时性与可靠性。
|
8月前
|
Dubbo 应用服务中间件 API
使用 Apifox、Postman 测试 Dubbo 服务,Apache Dubbo OpenAPI 即将发布
Apache Dubbo 3.3.3(即将发布)实现了与 OpenAPI 的深度集成,通过与 OpenAPI 的深度集成,用户能够体验到从文档生成到接口调试、测试和优化的全流程自动化支持。不论是减少手动工作量、提升开发效率,还是支持多语言和多环境,Dubbo 3.3.3 都展现了其对开发者体验的极大关注。结合强大的 Mock 数据生成和自动化测试能力,这一版本为开发者提供了极具竞争力的服务治理解决方案。如果你正在寻找高效、易用的微服务框架,Dubbo 3.3.3 将是你不容错过的选择。
716 257
|
3月前
|
测试技术 Python
Python接口自动化测试中Mock服务的实施。
总结一下,Mock服务在接口自动化测试中的应用,可以让我们拥有更高的灵活度。而Python的 `unittest.mock`库为我们提供强大的支持。只要我们正确使用Mock服务,那么在任何情况下,无论是接口是否可用,都可以进行准确有效的测试。这样,就大大提高了自动化测试的稳定性和可靠性。
177 0
|
11月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
218 5
|
11月前
|
存储 数据采集 监控
大数据技术:开启智能决策与创新服务的新纪元
【10月更文挑战第5天】大数据技术:开启智能决策与创新服务的新纪元
|
10月前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
315 3
|
11月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
228 4
|
11月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
333 1

热门文章

最新文章