前言
前几篇我们讲解了ODPS数据开发过程中会大量用到的各种日期与字符、数学运算、聚合、开窗、自定义等函数,
今天我们针对海量数据开发过程数据开发效率优化做讲解。
常见数据优化问题
数据倾斜问题
1、大表关联小表
大表关联小表出现倾斜时,可以使用mapjoin的hint(/+mapjoin(b)/)。
同时可适当调整mapjoin中小表的内存大小:
set odps.sql.mapjoin.memory.max=512;默认512,单位M,[128,2048]之间调整。
2、大表关联大表
- 大表中存在热点key:可以考虑对大表进行拆分,根据join的key,把热点的数据拆出来走mapjoin,其余的考虑普通join即可。当然也有skewjoin的hint可以参考使用。
- 大表中不存在热点key:可以考虑在分区的基础上加上桶,对关联字段进行分桶,减少shuffle的数据量。
3、count distinct
常见的数据倾斜还有一种情况是因为使用了count distinct,这种情况可以考虑使用group by先进行数据去重,再count。
常用的参数设置
常用的调整无外乎调整map、join、reduce的个数,map、join、reduce的内存大小。
本文以ODPS的参数设置为例,参数可能因版本不同而略有差异。
1、Map设置
set odps.sql.mapper.cpu=100
作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql任务来说,一般不需要调整Cpu个数的。
set odps.sql.mapper.memory=1024
作用:设定Map Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。
场景:当Map阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。
set odps.sql.mapper.merge.limit.size=64
作用:设定控制文件被合并的最大阈值,单位M,默认64M,在[0,Integer.MAX_VALUE]之间调整。
场景:当Map端每个Instance读入的数据量不均匀时,可以通过设置这个变量值进行小文件的合并,使得每个Instance的读入文件均匀。一般会和odps.sql.mapper.split.size这个参数结合使用。
set odps.sql.mapper.split.size=256
作用:设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单位M,默认256M,在[1,Integer.MAX_VALUE]之间调整。
场景:当每个Map Instance处理的数据量比较大,时间比较长,并且没有发生长尾时,可以适当调小这个参数。如果有发生长尾,则结合odps.sql.mapper.merge.limit.size这个参数设置每个Map的输入数量。
2、Join设置
set odps.sql.joiner.instances=-1
作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以超过2000。
场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.joiner.cpu=100
作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整CPU数目。对于大多数SQL任务来说,一般不需要调整CPU。
set odps.sql.joiner.memory=1024
作用:设定Join Task每个Instance的Memory大小,单位为M,默认为1024M,在[256,12288]之间调整。
场景:当Join阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。
作业跑完后,可以在 summary 中搜索 writer dumps 字样来判断是否产生 Writer Dumps。
4、Reduce设置
set odps.sql.reducer.instances=-1
作用: 设定Reduce Task的Instance数量,手动设置区间在[1,99999]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为99999,走HBO优化时可以超过99999。
场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.reducer.cpu=100
作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql任务来说,一般不需要调整Cpu。
set odps.sql.reducer.memory=1024
作用:设定Reduce Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。
场景:当Reduce阶段的Instance有Writer Dumps时,可以适当的增加内存的大小,减少Dumps所花的时间。
上面这些参数虽然好用,但是也过于简单暴力,可能会对集群产生一定的压力。特别是在集群整体资源紧张的情况下,增加资源的方法可能得不到应有的效果,随着资源的增大,等待资源的时间变长的风险也随之增加,导致效果不好!因此我们要合理的使用资源参数!
小文件合并参数
set odps.merge.cross.paths=true|false
作用:设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区生成独立的Merge Action进行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。
set odps.merge.smallfile.filesize.threshold = 64
作用:设置合并文件的小文件大小阀值,文件大小超过该阀值,则不进行合并,单位为M,可以不设,不设时,则使用全局变量odps_g_merge_filesize_threshold,该值默认为32M,设置时必须大于32M。
set odps.merge.maxmerged.filesize.threshold = 256
作用:设置合并输出文件量的大小,输出文件大于该阀值,则创建新的输出文件,单位为M,可以不设,不设时,则使用全局变odps_g_max_merged_filesize_threshold,该值默认为256M,设置时必须大于256M。
set odps.merge.max.filenumber.per.instance = 10000
作用:设置合并Fuxi Job的单个Instance允许合并的小文件个数,控制合并并行的Fuxi Instance数,可以不设,不设时,则使用全局变量odps_g_merge_files_per_instance,该值默认为100,在一个Merge任务中,需要的Fuxi Instance个数至少为该目录下面的总文件个数除以该限制。
set odps.merge.max.filenumber.per.job = 10000
作用:设置合并最大的小文件个数,小文件数量超过该限制,则超过限制部分的文件忽略,不进行合并,可以不设,不设时,则使用全局变量odps_g_max_merge_files,该值默认为10000。
UDF相关参数
set odps.sql.udf.jvm.memory=1024
作用: 设定UDF JVM Heap使用的最大内存,单位M,默认1024M,在[256,12288]之间调整。
场景:某些UDF在内存计算、排序的数据量比较大时,会报内存溢出错误,这时候可以调大该参数,不过这个方法只能暂时缓解,还是需要从业务上去优化。
set odps.sql.udf.timeout=1800
作用:设置UDF超时时间,默认为1800秒,单位秒。[0,3600]之间调整。
set odps.sql.udf.python.memory=256
作用:设定UDF python 使用的最大内存,单位M,默认256M。[64,3072]之间调整。
set odps.sql.udf.optimize.reuse=true/false
作用:开启后,相同的UDF函数表达式,只计算一次,可以提高性能,默认为True。
set odps.sql.udf.strict.mode=false/true
作用:True为金融模式,False为淘宝模式,控制有些函数在遇到脏数据时是返回NULL还是抛异常,True是抛出异常,False是返回null。
其它
1、Mapjoin设置
set odps.sql.mapjoin.memory.max=512
作用:设置Mapjoin时小表的最大内存,默认512,单位M,[128,2048]之间调整。
2、动态分区设置
set odps.sql.reshuffle.dynamicpt=true/false
作用:默认true,用于避免拆分动态分区时产生过多小文件。如果生成的动态分区个数只会是很少几个,设为false避免数据倾斜。
3、数据倾斜设置
set odps.sql.groupby.skewindata=true/false
作用:开启Group By优化。
set odps.sql.skewjoin=true/false
作用:开启Join优化,必须设置odps.sql.skewinfo 才有效。