Data Serialization
数据序列化,对于任意分布式系统都是性能的关键点
Spark默认使用Java serialization,这个比较低效
推荐使用,Kryo serialization,会比Java序列化,更快更小, Spark使用Twitter chill library(Kryo的scala扩展)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb“, 2), 需要大于最大的需要序列化的对象size
之所以,spark不默认使用Kryo,因为Kryo需要显式的注册program中使用到的class,参考
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
只所以要做注册是因为,在把对象序列化成byte[]时,要记录下classname,classname带namespace一般很长的,所以每个里面加上这个classname比较费空间
在kryo里面注册过后,会用一个int来替代classname
当然不注册kryo也是可以用的,只是会多占空间
Memory Tuning
Tuning之前需要知道当前dataset的内存消耗是多少,
简单的方法是,以该dataset创建rdd,然后cache
这样从SparkContext的日志里面可以看到每个partition的大小,加一下,就可以得到整个数据集的大小
INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)
This means that partition 1 of RDD 0 consumed 717.5 KB.
然后可以从几个方面去进行优化,
Tuning Data Structures
Java对象虽然便于访问,但是和raw data比,java对象的size要大2~5倍
Each distinct Java object has an “object header”, which is about 16 bytes
JavaString
s have about 40 bytes of overhead over the raw string data, and store each character as two bytes due toString
’s internal usage of UTF-16 encoding
其他的比如HashMap或LinkedList,除了header,还需要8 bytes pointer来指向下个对象
总之,就是对于内存敏感的应用,直接使用Java对象是非常不经济的
可以从以下几点去优化,
a, 优先使用arrays of objects, and primitive types,而非java或scala的标准collection class
或者使用fastutil library,这个库提供了用primitive types实现的collection class
b, 避免含有大量小对象或pointer的嵌套数据结构
c, Consider using numeric IDs or enumeration objects instead of strings for keys
d, If you have less than 32 GB of RAM, set the JVM flag-XX:+UseCompressedOops
to make pointers be four bytes instead of eight. You can add these options inspark-env.sh
.
Serialized RDD Storage
使用MEMORY_ONLY_SER,在memory中cache序列化后的数据,降低内存使用,当然响应的访问速度会降低,由于需要反序列化
Garbage Collection Tuning
首先需要打开gc日志,
adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
to the Java options
Cache Size Tuning
默认Spark使用60% 的executor memory(spark.executor.memory
)来cache RDDs.
也就是说只有40%的memory用于task执行,如果发现频繁gc或是oom,可以调低用于cache的比例,
conf.set("spark.storage.memoryFraction", "0.5"), 这样设成50%
Advanced GC Tuning
Spark做gc tuning的目标是,避免在task执行过程中发生full gc, 即需要让Young区足够容纳short-lived objects
a, 如果发生多次full gc或是OldGen已经接近full,说明内存不够,可以降低cache比例
b, 如果很多minor gc,但没有major gc,说明young区过小, 我们可以根据task dataset需要消耗内存来预估eden区,young区大小= eden区 × (4/3),因为要加上survivor区
c, 如果从hdfs读取数据,可以根据hdfs block大小来预估eden区大小,比如,如果解压比例3倍,4个tasks并行,block大小64M,那么eden区大小 = 3×4×64M
其他的一些考虑,
调整并发的level, 通过增加并发来降低reduce task的内存消耗
用broadcast functionality来处理大的变量, data locality
本文章摘自博客园,原文发布日期:2015-04-21