拓展功能
▐ ODPS + SQL function
在跑SQL时,我们可以将一些重复繁琐的过程抽象成函数。明确好入参和出参,写好方法后可进行验证。
例1:
问题背景:
字符串类型:对于所有的信息都存放在一个json串中,需要根据不同的key进行解析
初始代码
REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(json_data,'$.checkboxField_l1d6qn51'),'[\\\"',''),'\\\"]',''),'\\\"','')
改造成SQL函数
CREATE SQL FUNCTION if not exists get_json_object_checkboxField(@a STRING,@b STRING )AS REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(@a,@b),'[\\\"',''),'\\\"]',''),'\\\"','');
改造后代码
get_json_object_checkboxField(json_data,'$.checkboxField_l1d6qn51')
例2:
问题背景:
时间类型:计算自然周或者自然月维度的指标
初始代码
TO_CHAR(DATEADD(TO_DATE('${bizdate}','yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE('${bizdate}','yyyymmdd')) == 0,7,WEEKDAY(TO_DATE('${bizdate}','yyyymmdd'))), 'dd'),'yyyymmdd')
改造成SQL函数
CREATE SQL FUNCTION if not exists natural_week(@a STRING)AS TO_CHAR(DATEADD(TO_DATE(@a,'yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE(@a,'yyyymmdd')) == 0,7,WEEKDAY(TO_DATE(@a,'yyyymmdd'))), 'dd'),'yyyymmdd');
改造后代码
natural_week('${bizdate}')
▐ ODPS + UDF
通过对自定义的MAX_UDF函数的推出,仅通过申请一个UDF函数即可调用所有函数,操作简便,达到减少申请时间成本及重复开发成本的目的。
本部分与上文自定义UDF开发篇密切相关,下面举一个简单的例子:
问题背景:
字符串加密。
入参:第一个参数是字符串加密的序列号;第二个参数是要加密的字符;第三个是加密的开始位数;第四个是要加密几位;第五个参数是加密的字符内容。
出参:针对字符串加密处理。
select process_string('{"clazzNo":"011","methodNo":"01"}',"123411412341","2","7","*");
注:这里process_string,是我们自己写的UDF方法。
问题处理
▐ 性能分析
- 编译阶段
据 logview 的子状态(SubStatusHistory)可以进一步细分为调度、优化、生成物理执行计划、数据跨集群复制等子阶段。
阶段 |
特征 |
原因 |
解决方案 |
调度阶段 |
子状态为“Waiting for cluster resource”,作业排队等待被编译。 |
1.计算集群资源紧缺。 |
查看计算集群的状态,需要等待计算集群的资源。 |
2. 编译资源池资源不够 |
|||
优化阶段 |
子状态为“SQLTask is optimizing query”,优化器正在优化执行计划。 |
1.执行计划复杂,需要等待较长时间做优化。 |
一般可接受10分钟以内,如果真的太长时间不退出,基本可认为是 odps 的 bug。 |
生成物理执行计划阶段 |
子状态为“SQLTask is generating execution plan”。 |
1.读取的分区太多。每个分区需要去根据分区信息来决定处理方式,决定 split,并且会写到生成的执行计划中。 |
需要好好设计 SQL,减少分区的数量,包括:分区裁剪、筛除不需要读的分区、把大作业拆成小作业。 |
2.小文件太多(万级别),ODPS 会根据文件大小决定 split,小文件多了会导致计算 split 的过程耗时增加。 |
使用TunnelBufferedWriter接口,可以更简单的进行上传功能,同时避免小文件。 执行一次 alter table merge smallfiles; 让 odps 把小文件 merge 起来, |
||
数据跨集群复制阶段 |
子状态列表里面出现多次“Task rerun”,result 里有错误信息“FAILED: ODPS-0110141:Data version exception”。 |
1.project 刚做集群迁移,往往前一两天有大量需要跨集群复制的作业。 |
这种情况是预期中的跨集群复制,需要用户等待。 |
2.可能是作业提交错集群,或者是中间 project 做过迁移,分区过滤没做好,读取了一些比较老的分区。 |
检查作业提交的集群是否正确, Logview2.0任务详情页左侧的 BasicInfo 查看作业提交的集群。 |
- 执行阶段
logview 的 detail 界面有执行计划(执行计划没有全都绿掉),且作业状态还是 Running。
执行阶段卡住或执行时间比预期长的主要原因有等待资源,数据倾斜,UDF 执行低效,数据膨胀等。
阶段 |
特征 |
解决方案 |
等待资源 |
一些instance处于Ready状态,部分instance处于Running状态。 |
确定排队状态是否正常。可以通过 logview 的排队信息“Queue”看作业在队列的位置。 |
数据倾斜 |
task 中大多数 instance 都已经结束了,但有某几个 instance 却迟迟不结束(长尾)。 |
|
UDF执行低效 |
某个 task 执行效率低,且该 task 中有用户自定义的扩展。 |
有时候 bug 是由于某些特定的数据值引起的,比如出现某个值的时候会引起死循环。
内置函数是有可能被同名 UDF 覆盖的,当看到一个函数像是内置函数时,需要确定是否有同名 UDF 覆盖了内置函数。
evaluate 中只做与参数相关的必要操作。 |
数据膨胀 |
task 的输出数据量比输入数据量大很多。 |
|
在线业务压制 |
ODPS集群中的一部分是离线集群,另一部分是在线集群。 |
如果是弹内环境,可通过fuxi sensor确认是否存在在线业务压制。 |
UDF执行:
set odps.sql.udf.jvm.memory=-- 设定UDF JVM Heap使用的最大内存,单位M,默认1024M-- 可手动调整区间[256,12288]
- 结束阶段
有时 Fuxi 作业结束时,作业总体进度仍然处于运行状态。原因有两种:
- 单 SQL 作业可能包含多个 Fuxi 作业
- Fuxi 作业结束后,SQL 在结束阶段运行于控制集群的逻辑占用时间较长
阶段 | 特性 | 解决方案 |
子查询多阶段执行 |
MaxCompute SQL 的子查询会被编译进同一个 Fuxi DAG,即所有子查询和主查询都通过一个 Fuxi 作业完成。 但也有一些特殊子查询需要先将子查询单独执行。 |
子查询 SELECT DISTINCT ds FROM t_ds_set 先执行,其结果需要被用来做分区裁剪,来优化主查询需要读取的分区数。 |
过多小文件 |
存储方面:小文件过多会给 Pangu 文件系统带来一定的压力,且影响空间的有效利用。 计算方面:ODPS 处理单个大文件比处理多个小文件更有效率,小文件过多会影响整体的计算执行性能。 |
为了避免系统产生过多小文件,SQL作业会在结束时自动触发合并小文件的操作。 根据参数odps.merge.smallfile.filesize.threshold来判定小文件,默认阈值为32MB。 可通过logview查看作业是否触发了自动合并小文件。 |
动态分区元数据更新 |
Fuxi 作业执行完后,有可能还有一些元数据操作。 |
对分区表 sales 使用 insert into ... values命令新增 2000 个分区: INSERT INTO TABLE sales partition (ds)(ds, product, price) VALUES ('20170101','a',1),('20170102','b',2),('20170103','c',3), ...; |
输出文件size变大 |
在输入输出条数相差不大的情况,结果膨胀几倍。 |
一般是数据分布变化导致的,在写表的过程中,会对数据进行压缩,而压缩算法对于重复数据的压缩率是最高的。 |
子查询:
SELECT product, sum(price) FROM sales WHERE ds in (SELECT DISTINCT ds FROM t_ds_set) GROUP BY product;
▐ 性能优化
- 优化运行时间
在优化运行时间这个维度上,我们重点关注时间上的加速,单位时间内可能会消耗更多的计算资源。总成本有可能上升,也可能降低。
优化类型 |
具体类型 |
优化措施 |
调整并行度 |
instance数量的增加会对执行速度产生影响:
|
需要强制 1 个 instance 执行 用户需要检查这些操作是否必要,能否去掉,尽量取消掉这些操作:读表的 task + 非读表的 task |
影响单个task并行度主要因素:
|
对于读表的 task,一个 instance 读取 256M的数据,一些常见出问题的情况:
可以通过调整flag实现: set odps.sql.mapper.split.size= xxx |
|
非读表的 task,主要有三种方式调整并行度:
|
set odps.sql.reducer.instances= xxx -- 设定Reduce task的instance数量 set odps.sql.joiner.instances= xxx -- 设定Join task的instance数量 |
|
HBO |
HBO (History-Based Optimization) 会根据对历史作业的分析来优化当前作业的。 包括内存、并行度等一系列参数,它能让你的周期作业越跑越快。 |
为了尽可能解决HBO失效这个顽疾,我们在HBO中增加了若干新的功能,包括:
|
优化执行计划 |
CBO优化器会基于统计信息、SQL语义、执行引擎能力、丰富的优化能力,自动生成最优的执行计划,并且在持续提升优化能力。 |
Map Join Hint 用户可以手动添加map join hint,使得原本的Sort-Merge Join变成Map Join,避免大表数据shuffle从而提升性能。 |
Distributed Map Join Hint Distributed MapJoin是MapJoin的升级版,适用于适用于大表Join中表的场景 的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。 |
||
Dynamic Filter Hint 基于JOIN等值连接的特性,MaxCompute可以通过表A的数据生成一个过滤器,在Shuffle或JOIN之前提前过滤表B的数据。 |
||
物化视图 物化视图(Materialized View)本质是一种预计算,即把某些耗时的操作(如JOIN/AGGREGATE)的结果保存下来。 以便在查询时直接复用,从而避免这些耗时的操作,最终达到加速查询的目的。 |
||
数据倾斜 |
数据Shuffle导致的数据倾斜 1 |
数据倾斜大多数是由于数据的 reshuffle 引起的,因为按照某个 key 来做 shuffle,同一个 key 值的数据会强制集中在一个 instance 处理。
|
数据Shuffle导致的数据倾斜 2 |
特征:读表并写动态分区作业,M task 读入大量数据,但是只会写出少量的 动态分区。 解决方法:set odps.sql.reshuffle.dynamicpt =false; 去掉reshuffle 过程。 |
- 优化资源消耗
优化类型 |
具体类型 |
优化措施 |
SQL的新语法、新功能 |
GROUPING SETS:对 SELECT 语句中 GROUP BY 子句的扩展。 |
SQL 运行*时物理执行计划做了 3 次聚合,然后再 UNION 起来。 |
脚本模式: 脚本模式能让用户以脚本的形式提交多条语句同时执行。 |
脚本模式的性能优势,实际上是“将分散的业务逻辑合并成一个作业来运行“的性能优势:
|
|
MR典型场景用SQL实现 |
select k, WM_CONCAT(';',concat(v,":",c)) from ( select k, v, count(v) c from t group by k,v) t2 group by k;
rows between x preceding|following and y preceding|following
MapReduce 实现的 JOIN 逻辑。
适用场景:MapReduce Streaming 作业。 |
|
合理设置资源参数 |
Map设置 |
set odps.sql.mapper.cpu=100 作用:设置处理Map Task每个Instance的CPU数目 set odps.sql.mapper.memory=1024 作用:设定Map Task每个Instance的Memory大小 set odps.sql.mapper.merge.limit.size=64 作用:设定控制文件被合并的最大阈值 set odps.sql.mapper.split.size=256 作用:设定一个Map的最大数据输入量 |
Join设置 |
set odps.sql.joiner.instances=-1 作用: 设定Join Task的Instance数量 set odps.sql.joiner.cpu=100 作用: 设定Join Task每个Instance的CPU数目 set odps.sql.joiner.memory=1024 作用:设定Join Task每个Instance的Memory大小 |
|
Reduce设置 |
set odps.sql.reducer.instances=-1 作用: 设定Reduce Task的Instance数量 set odps.sql.reducer.cpu=100 作用:设定处理Reduce Task每个Instance的Cpu数目 set odps.sql.reducer.memory=1024 作用:设定Reduce Task每个Instance的Memory大小 |
GROUPING SETS 优化措施*:
SELECT NULL, NULL, NULL, COUNT(*)FROM requestsUNION ALLSELECT os, device, NULL, COUNT(*)FROM requests GROUP BY os, deviceUNION ALLSELECT NULL, NULL, city, COUNT(*)FROM requests GROUP BY city;
上述 SQL 运行时物理执行计划做了 3 次聚合,然后再 UNION 起来。
SELECT os, device, city, COUNT(*)FROM requestsGROUP BY os, device, city GROUPING SETS((os, device), (city), ());
物理执行计划只包含一个 Reduce 阶段,无需进行 UNION 操作,使用更少代码的同时消耗更少的集群资源。
▐ 恢复已删
总结
经过一个多月的整理和总结,终于完成了《ODPS开发大全》的这个基础版本。在这过程中我不断地接触到新的知识点,学到之前未曾掌握的技术,也感叹ODPS功能之丰富强大。期望在未来工作中,自己可以多沉淀好的技术文档,这不仅让我更加深刻地温习过往学习的技术,也可以把知识共享给更多求知若渴的技术人,建设更开放的CS技术社区。