阿里云E-MapReduce集群不同计算引擎sleep task使用笔记

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 需求:日常在E-MapReduce集群中进行相关测试,验证一些切换或变更是否会影响业务的运行导致任务failed。所以需要在测试集群中运行指定资源数(vcore及memory)或者指定运行时间的任务。目前用到MapReduce和spark任务两种,其余的持续更新补充中……

1. 概述

需求:日常在E-MapReduce集群中进行相关测试,验证一些切换或变更是否会影响业务的运行导致任务failed。所以需要在测试集群中运行指定资源数(vcore及memory)或者指定运行时间的任务。

目前用到MapReduce和spark任务两种,其余的持续更新补充中……

2. mapreduce

官方提供的test jar,可以通过如下命令找到该测试jar包

# 找到环境变量中的hadoop homeenv |grep -i hadoop 
# 找到测试jarcd$HADOOP_HOME/share/hadoop/mapreduce/
# pwd# /opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/mapreduce

image.png

2.1. 用法

# 查看jar包中的类 可以看到有现成的sleep可以使用hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar

image.png

# 查看sleep类的用法# hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar <类名>hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar sleep# 可以看到如下图所示,在-m -r 指定mapper和reducer的数量# -mt 和 -rt指定在map 和reduce过程中sleep的时长,单位为msec

image.png

hadoop jar hadoop-mapreduce-client-jobclient-2.8.5-tests.jar sleep-m10-r10-mt1000-rt300000

3. spark

spark home下暂时没找到可以直接配参数调用的脚本,用来控制资源+执行时长的类似sleep测试jar包或代码,所以以apache spark中样例代码PI的计算为基础,做了下改动。

3.1. 用法

以pyspark api为基础,使用spark-submit起任务,脚本中的`--sleep`参数可以用纸在map task时sleep的时长。

spark-submit \
--deploy-mode cluster \
--master yarn \
--executor-memory 5g \
--conf spark.executor.instances=6 \
--conf spark.dynamicAllocation.enabled=false \
  ./pi_sleep.py \
--sleep30

image.png

3.2. 样例代码

importtimefromrandomimportrandomfromoperatorimportaddfrompyspark.sqlimportSparkSessionimportargparseif__name__=="__main__":
"""        Usage:         spark-submit \            --deploy-mode cluster \            --master yarn \            --conf spark.executor.instances=6 \            --conf spark.dynamicAllocation.enabled=false \            ./pi_sleep.py \            --sleep 30    """parser=argparse.ArgumentParser()
parser.add_argument("--partition", default=4, type=int, help="number of spark rdd partition, default 4")
parser.add_argument("--sleep", default=0, type=int, help="seconds of map task sleep time, default 0")
args=parser.parse_args()
spark=SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()
start=time.time()
# partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2partitions=int(args.partition)
n=100000*partitionst_sleep=args.sleep/100000deff(_: int) ->float:
x=random() *2-1y=random() *2-1time.sleep(t_sleep)
return1ifx**2+y**2<=1else0count=spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add)
print("Pi is roughly %f"% (4.0*count/n))
end=time.time()
print(f'calc execute time: {end-start} s')
spark.stop()
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
存储 缓存 资源调度
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
814 0
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
|
4月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
资源调度 关系型数据库 数据库
实时计算 Flink版产品使用合集之flink-cdc.sh xx.yaml提交到yarn 发现没有启动task manager的,怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
365 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
1月前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 缓存 资源调度
实时计算 Flink版产品使用问题之在Flink on Yarn模式下,如何对job作业进行指标监控
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之在DolphinScheduler调度Flink批作业时,遇到作业提交后状态立即变为成功,但实际上作业还在后台运行的情况,如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 分布式计算 Oracle
实时计算 Flink版操作报错合集之flink on yarn job manager 可以启动, 但不给分配slot ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
136 0
|
4月前
|
Oracle 关系型数据库 数据处理
实时计算 Flink版产品使用合集之新表启动后其他任务已经完成,而DAG图上的busy状态仍然为100,并且保留的task的backpressure为0,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
Java API 分布式数据库
实时计算 Flink版产品使用合集之如何解决 TaskManager和 JobManager中有大量的等待线程
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。