1. 概述
需求:日常在E-MapReduce集群中进行相关测试,验证一些切换或变更是否会影响业务的运行导致任务failed。所以需要在测试集群中运行指定资源数(vcore及memory)或者指定运行时间的任务。
目前用到MapReduce和spark任务两种,其余的持续更新补充中……
2. mapreduce
官方提供的test jar,可以通过如下命令找到该测试jar包
# 找到环境变量中的hadoop homeenv |grep -i hadoop # 找到测试jarcd$HADOOP_HOME/share/hadoop/mapreduce/ # pwd# /opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/mapreduce
2.1. 用法
# 查看jar包中的类 可以看到有现成的sleep可以使用hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar
# 查看sleep类的用法# hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar <类名>hadoop jar ./hadoop-mapreduce-client-jobclient-2.8.5-tests.jar sleep# 可以看到如下图所示,在-m -r 指定mapper和reducer的数量# -mt 和 -rt指定在map 和reduce过程中sleep的时长,单位为msec
hadoop jar hadoop-mapreduce-client-jobclient-2.8.5-tests.jar sleep-m10-r10-mt1000-rt300000
3. spark
spark home下暂时没找到可以直接配参数调用的脚本,用来控制资源+执行时长的类似sleep测试jar包或代码,所以以apache spark中样例代码PI的计算为基础,做了下改动。
3.1. 用法
以pyspark api为基础,使用spark-submit起任务,脚本中的`--sleep`参数可以用纸在map task时sleep的时长。
spark-submit \ --deploy-mode cluster \ --master yarn \ --executor-memory 5g \ --conf spark.executor.instances=6 \ --conf spark.dynamicAllocation.enabled=false \ ./pi_sleep.py \ --sleep30
3.2. 样例代码
importtimefromrandomimportrandomfromoperatorimportaddfrompyspark.sqlimportSparkSessionimportargparseif__name__=="__main__": """ Usage: spark-submit \ --deploy-mode cluster \ --master yarn \ --conf spark.executor.instances=6 \ --conf spark.dynamicAllocation.enabled=false \ ./pi_sleep.py \ --sleep 30 """parser=argparse.ArgumentParser() parser.add_argument("--partition", default=4, type=int, help="number of spark rdd partition, default 4") parser.add_argument("--sleep", default=0, type=int, help="seconds of map task sleep time, default 0") args=parser.parse_args() spark=SparkSession \ .builder \ .appName("PythonPi") \ .getOrCreate() start=time.time() # partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2partitions=int(args.partition) n=100000*partitionst_sleep=args.sleep/100000deff(_: int) ->float: x=random() *2-1y=random() *2-1time.sleep(t_sleep) return1ifx**2+y**2<=1else0count=spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add) print("Pi is roughly %f"% (4.0*count/n)) end=time.time() print(f'calc execute time: {end-start} s') spark.stop()