本文旨在收集整理ODPS开发中入门及进阶级知识,尽可能涵盖大多数ODPS开发问题,成为一本mini百科全书,后续也会持续更新。希望通过笔者的梳理和理解,帮助刚接触ODPS开发的同学快速上手。
本系列分为两部分:入门篇和进阶篇。
常用参数设置
常用的调整无外乎调整map、join、reduce的个数,map、join、reduce的内存大小。
以ODPS的参数设置为例,参数可能因版本不同而略有差异。
参数类型 |
具体使用 |
|
set odps.sql.mapper.cpu=100 作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些任务如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql任务来说,一般不需要调整Cpu个数的。 set odps.sql.mapper.memory=1024 作用:设定Map Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。场景:当Map阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。 set odps.sql.mapper.merge.limit.size=64 作用:设定控制文件被合并的最大阈值,单位M,默认64M,在[0,Integer.MAX_VALUE]之间调整。场景:当Map端每个Instance读入的数据量不均匀时,可以通过设置这个变量值进行小文件的合并,使得每个Instance的读入文件均匀。一般会和odps.sql.mapper.split.size这个参数结合使用。 set odps.sql.mapper.split.size=256 作用:设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单位M,默认256M,在[1,Integer.MAX_VALUE]之间调整。场景:当每个Map Instance处理的数据量比较大,时间比较长,并且没有发生长尾时,可以适当调小这个参数。如果有发生长尾,则结合odps.sql.mapper.merge.limit.size这个参数设置每个Map的输入数量。 |
2. Join设置 |
set odps.sql.joiner.instances=-1 作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以超过2000。场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。 set odps.sql.joiner.cpu=100 作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些任务如果特别耗计算资源的话,可以适当调整CPU数目。对于大多数SQL任务来说,一般不需要调整CPU。 set odps.sql.joiner.memory=1024 作用:设定Join Task每个Instance的Memory大小,单位为M,默认为1024M,在[256,12288]之间调整。场景:当Join阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。 |
3. Reduce设置 |
set odps.sql.reducer.instances=-1 作用: 设定Reduce Task的Instance数量,手动设置区间在[1,99999]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为99999,走HBO优化时可以超过99999。场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。 set odps.sql.reducer.cpu=100 作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。场景:某些任务如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql任务来说,一般不需要调整Cpu。 set odps.sql.reducer.memory=1024 作用:设定Reduce Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。场景:当Reduce阶段的Instance有Writer Dumps时,可以适当的增加内存的大小,减少Dumps所花的时间。 上面这些参数虽然好用,但是也过于简单暴力,可能会对集群产生一定的压力。特别是在集群整体资源紧张的情况下,增加资源的方法可能得不到应有的效果,随着资源的增大,等待资源的时间变长的风险也随之增加,导致效果不好!因此请合理的使用资源参数! |
4. 小文件合并参数 |
set odps.merge.cross.paths=true|false 作用:设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区生成独立的Merge Action进行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。 set odps.merge.smallfile.filesize.threshold = 64 作用:设置合并文件的小文件大小阀值,文件大小超过该阀值,则不进行合并,单位为M,可以不设,不设时,则使用全局变量odps_g_merge_filesize_threshold,该值默认为32M,设置时必须大于32M。 set odps.merge.maxmerged.filesize.threshold = 256 作用:设置合并输出文件量的大小,输出文件大于该阀值,则创建新的输出文件,单位为M,可以不设,不设时,则使用全局变odps_g_max_merged_filesize_threshold,该值默认为256M,设置时必须大于256M。 set odps.merge.max.filenumber.per.instance = 10000 作用:设置合并Fuxi Job的单个Instance允许合并的小文件个数,控制合并并行的Fuxi Instance数,可以不设,不设时,则使用全局变量odps_g_merge_files_per_instance,该值默认为100,在一个Merge任务中,需要的Fuxi Instance个数至少为该目录下面的总文件个数除以该限制。 set odps.merge.max.filenumber.per.job = 10000 作用:设置合并最大的小文件个数,小文件数量超过该限制,则超过限制部分的文件忽略,不进行合并,可以不设,不设时,则使用全局变量odps_g_max_merge_files,该值默认为10000。 |
5. UDF相关参数 |
set odps.sql.udf.jvm.memory=1024 作用: 设定UDF JVM Heap使用的最大内存,单位M,默认1024M,在[256,12288]之间调整。场景:某些UDF在内存计算、排序的数据量比较大时,会报内存溢出错误,这时候可以调大该参数,不过这个方法只能暂时缓解,还是需要从业务上去优化。 set odps.sql.udf.timeout=1800 作用:设置UDF超时时间,默认为1800秒,单位秒。[0,3600]之间调整。 set odps.sql.udf.python.memory=256 作用:设定UDF python 使用的最大内存,单位M,默认256M。[64,3072]之间调整。 set odps.sql.udf.optimize.reuse=true/false 作用:开启后,相同的UDF函数表达式,只计算一次,可以提高性能,默认为True。 set odps.sql.udf.strict.mode=false/true 作用:True为金融模式,False为淘宝模式,控制有些函数在遇到脏数据时是返回NULL还是抛异常,True是抛出异常,False是返回null。 |
6. Mapjoin设置 |
set odps.sql.mapjoin.memory.max=512 作用:设置Mapjoin时小表的最大内存,默认512,单位M,[128,2048]之间调整。 |
7. 动态分区设置 |
set odps.sql.reshuffle.dynamicpt=true/false 作用:默认true,用于避免拆分动态分区时产生过多小文件。如果生成的动态分区个数只会是很少几个,设为false避免数据倾斜。 |
8. 数据倾斜设置 |
set odps.sql.groupby.skewindata=true/false 作用:开启Group By优化。 set odps.sql.skewjoin=true/false 作用:开启Join优化,必须设置odps.sql.skewinfo 才有效。 |
常用内建函数
常用内建函数大概分为这几类,这边我们挑选一些重点的函数进行说明。
函数类型 |
说明 |
日期函数 |
支持处理DATE、DATETIME、TIMESTAMP等日期类型数据,实现加减日期、计算日期差值、提取日期字段、获取当前时间、转换日期格式等业务处理能力。 |
数学函数 |
支持处理BIGINT、DOUBLE、DECIMAL、FLOAT等数值类型数据,实现转换进制、数学运算、四舍五入、获取随机数等业务处理能力。 |
窗口函数 |
支持在指定的开窗列中,实现求和、求最大最小值、求平均值、求中间值、数值排序、数值偏移、抽样等业务处理能力。 |
聚合函数 |
支持将多条输入记录聚合成一条输出值,实现求和、求平均值、求最大最小值、求平均值、参数聚合、字符串连接等业务处理能力。 |
字符串函数 |
支持处理STRING类型字符串,实现截取字符串、替换字符串、查找字符串、转换大小写、转换字符串格式等业务处理能力。 |
复杂类型函数 |
支持处理MAP、ARRAY、STRUCT及JSON类型数据,实现去重元素、聚合元素、元素排序、合并元素等业务处理能力。 |
加密函数 |
支持处理STRING、BINARY类型的表数据,实现加密、解密等业务处理能力。 |
其他函数 |
除上述函数之外,提供支持其他业务场景的函数。 |
▐ 日期函数
函数名 |
具体操作 |
|
SELECTDATEADD(GETDATE(),-7,'dd'); TO_CHAR(DATEADD(GETDATE(),-7,'dd'),'yyyymmdd'); |
2. DATEADD(指定日期加减): |
SELECT DATEADD(GETDATE(),1,"dd"); //2021-01-09 10:48:40 GETDATE()返回值是2021-01-08 10:48:40 SELECT DATEADD(GETDATE(),1,"mm");//2021-02-08 10:49:24 GETDATE()返回值是2021-01-08 10:49:24 |
3. DATEDIFF(计算两个日期的差值): |
datediff(end, start, 'dd') = 1 datediff(end, start, 'mm') = 1 datediff(end, start, 'yyyy') = 1 datediff(end, start, 'hh') = 1 |
4. DATEPART(返回指定日期的年/月/日): |
SELECT DATEPART(GETDATE(),'mm'); SELECT DATEPART(GETDATE(),'yyyy'); SELECT DATEPART(GETDATE(),'dd'); SELECT DATEPART(GETDATE(),'hh'); |
5. DATETRUNC(截取时间): |
datetrunc(datetime '2011-12-07 16:28:46', 'yyyy') = 2011-01-01 00:00:00 datetrunc(datetime '2011-12-07 16:28:46', 'month') = 2011-12-01 00:00:00 datetrunc(datetime '2011-12-07 16:28:46', 'DD') = 2011-12-07 00:00:00 |
6. TO_CHAR 函数 |
使用方式 to_char(要处理的日期,日期格式) 推荐用法:2018-01-11 10:00:00 格式 ,处理为指定格式的日期字符串 效果:处理为yyyymmdd的日期格式,类型为字符串 to_char('2018-01-11 10:00:00','yyyymmdd') as date_3 to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5 to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6 to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8 to_char('2018-01-11 10:00:00','yyyy-mm-01 23:59:59') as date_9 |
7. TO_DATE函数 |
使用方式:to_date(datetime,format) 推荐用法:根据时间的格式,适当调整format的模版 效果:处理20180111、2018-01-11、2018-01-11 10:00:00 to_char('2018-01-11 10:00:00','yyyymmdd') as date_3 to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5 to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6 to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8 |
8. UNIX时间戳转换 |
函数:datetime from_unixtime(bigint unixtime) 支持秒 from_unixtime(123456789) = 1973-11-30 05:33:09 函数: from_utc_timestamp(bigint unixtime,string timezone) 支持毫秒 SELECT from_utc_timestamp(1501557840000 ,'GMT') ; --返回:2017-08-01 04:24:00 |
9. DATE转UNIX时间戳 |
函数:bigint unix_timestamp(datetime date) select unix_timestamp(datetime '2019-09-20 01:00:00'); --返回1568912400 select unix_timestamp('2019-09-20 01:00:00'); --返回1568912400 |