打开微信扫一扫,关注微信公众号【数据与算法联盟】
转载请注明出处: http://blog.csdn.net/gamer_gyt
博主微博: http://weibo.com/234654758
Github: https://github.com/thinkgamer
最近在搞一个价格分类模型,虽说是分类,用的是kmeans算法,求出聚类中心,对每个价格进行级别定级。虽然说起来简单,但做起来却是并没有那么容易,不只是因为数据量大,在执行任务时要不是效率问题就是shuffle报错等。但在这整个过程中对scala编程,Spark rdd 机制,以及海量数据背景下对算法的认知都有很大的提升,这一篇文章主要是总结一些Spark在shell 终端提交jar包任务的时候的相关知识,在后续文章会具体涉及到相关的”实战经历“。
对Spark的认识
由于之前接触过Hadoop,对Spark也是了解一些皮毛,但中间隔了好久才重新使用spark,期间也产生过一些错误的认识。
之前觉得MapReduce耗费时间,写一个同等效果的Spark程序很快就能执行完,很长一段时间自己都是在本地的单机环境进行测试学习,所以这种错误的认知就会更加深刻,但事实却并非如此,MR之所以慢是因为每一次操作数据都写在了磁盘上,大量的IO造成了时间和资源的浪费,但是Spark是基于内存的计算引擎,相比MR,减少的是大量的IO,但并不是说给一个Spark程序足够的资源,就可以为所欲为了,在提交一个spark程序时,不仅要考虑所在资源队列的总体情况,还要考虑代码本身的高效性,要尽量避免大量的shuffle操作和action操作,尽量使用同一个rdd。
会用spark,会调api和能用好spark是两回事,在进行开发的过程中,不仅要了解运行原理,还要了解业务,将合适的方法和业务场景合适的结合在一起,才能发挥最大的价值。
spark-submit
进入spark的home目录,执行以下命令查看帮助
bin/spark-submit --help
spark提交任务常见的两种模式
1:local/local[K]
- 本地使用一个worker线程运行spark程序
- 本地使用K个worker线程运行spark程序
此种模式下适合小批量数据在本地调试代码
2:yarn-client/yarn-cluster
- yarn-client:以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。
- yarn-cluster:以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。
注意:若使用的是本地文件需要在file路径前加:file://
在提交任务时的几个重要参数
- executor-cores —— 每个executor使用的内核数,默认为1
- num-executors —— 启动executors的数量,默认为2
- executor-memory —— executor内存大小,默认1G
- driver-cores —— driver使用内核数,默认为1
- driver-memory —— driver内存大小,默认512M
下边给一个提交任务的样式
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
如果这里通过--queue 指定了队列,那么可以免去写--master
以上就是通过spark-submit来提交一个任务
几个参数的常规设置
executor_cores*num_executors
表示的是能够并行执行Task的数目
不宜太小或太大!一般不超过总队列 cores 的 25%,比如队列总 cores 400,最大不要超过100,最小不建议低于 40,除非日志量很小。executor_cores
不宜为1!否则 work 进程中线程数过少,一般 2~4 为宜。executor_memory
一般 6~10g 为宜,最大不超过20G,否则会导致GC代价过高,或资源浪费严重。driver-memory
driver 不做任何计算和存储,只是下发任务与yarn资源管理器和task交互,除非你是 spark-shell,否则一般 1-2g
增加每个executor的内存量,增加了内存量以后,对性能的提升,有三点:
- 1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,
甚至不写入磁盘。减少了磁盘IO。 - 2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
- 3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升。
常规注意事项
- 预处理数据,丢掉一些不必要的数据
- 增加Task的数量
- 过滤掉一些容易导致发生倾斜的key
- 避免创建重复的RDD
- 尽可能复用一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle算子
- 在要使用groupByKey算子的时候,尽量用reduceByKey或者aggregateByKey算子替代.因为调用groupByKey时候,按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式,此时所有的键值对都将被重新洗牌,移动,对网络数据传输造成理论上的最大影响.
- 使用高性能的算子
参考:
1:http://www.cnblogs.com/haozhengfei/p/e570f24c43fa15f23ebb97929a1b7fe6.html
2:https://www.jianshu.com/p/4c584a3bac7d