ODPS开发大全:进阶篇(4)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
密钥管理服务KMS,1000个密钥,100个凭据,1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: ODPS开发大全:进阶篇

拓展功能


 ODPS + SQL function


在跑SQL时,我们可以将一些重复繁琐的过程抽象成函数。明确好入参和出参,写好方法后可进行验证。


例1:

问题背景:

字符串类型:对于所有的信息都存放在一个json串中,需要根据不同的key进行解析


初始代码


REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(json_data,'$.checkboxField_l1d6qn51'),'[\\\"',''),'\\\"]',''),'\\\"','')


改造成SQL函数



CREATE SQL FUNCTION if not exists get_json_object_checkboxField(@a STRING,@b STRING )AS REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(@a,@b),'[\\\"',''),'\\\"]',''),'\\\"','');


改造后代码


get_json_object_checkboxField(json_data,'$.checkboxField_l1d6qn51')


例2:

问题背景:

时间类型:计算自然周或者自然月维度的指标


初始代码


TO_CHAR(DATEADD(TO_DATE('${bizdate}','yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE('${bizdate}','yyyymmdd')) == 0,7,WEEKDAY(TO_DATE('${bizdate}','yyyymmdd'))), 'dd'),'yyyymmdd')


改造成SQL函数



CREATE SQL FUNCTION if not exists natural_week(@a STRING)AS TO_CHAR(DATEADD(TO_DATE(@a,'yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE(@a,'yyyymmdd')) == 0,7,WEEKDAY(TO_DATE(@a,'yyyymmdd'))), 'dd'),'yyyymmdd');


改造后代码


natural_week('${bizdate}')

 ODPS + UDF


通过对自定义的MAX_UDF函数的推出,仅通过申请一个UDF函数即可调用所有函数,操作简便,达到减少申请时间成本及重复开发成本的目的。
本部分与上文自定义UDF开发篇密切相关,下面举一个简单的例子:

问题背景:

字符串加密。


入参:第一个参数是字符串加密的序列号;第二个参数是要加密的字符;第三个是加密的开始位数;第四个是要加密几位;第五个参数是加密的字符内容。

出参:针对字符串加密处理。



select process_string('{"clazzNo":"011","methodNo":"01"}',"123411412341","2","7","*");

注:这里process_string,是我们自己写的UDF方法。
问题处理


 性能分析

image.png


  • 编译阶段


据 logview 的子状态(SubStatusHistory)可以进一步细分为调度、优化、生成物理执行计划、数据跨集群复制等子阶段。

阶段

特征

原因

解决方案

调度阶段

子状态为“Waiting for cluster resource”,作业排队等待被编译。

1.计算集群资源紧缺。

查看计算集群的状态,需要等待计算集群的资源。

2. 编译资源池资源不够

优化阶段

子状态为“SQLTask is optimizing query”,优化器正在优化执行计划。

1.执行计划复杂,需要等待较长时间做优化。

一般可接受10分钟以内,如果真的太长时间不退出,基本可认为是 odps 的 bug。

生成物理执行计划阶段

子状态为“SQLTask is generating execution plan”。

1.读取的分区太多。每个分区需要去根据分区信息来决定处理方式,决定 split,并且会写到生成的执行计划中。

需要好好设计 SQL,减少分区的数量,包括:分区裁剪、筛除不需要读的分区、把大作业拆成小作业。

2.小文件太多(万级别),ODPS 会根据文件大小决定 split,小文件多了会导致计算 split 的过程耗时增加。

使用TunnelBufferedWriter接口,可以更简单的进行上传功能,同时避免小文件。

执行一次 alter table merge smallfiles; 让 odps 把小文件 merge 起来,

数据跨集群复制阶段

子状态列表里面出现多次“Task rerun”,result 里有错误信息“FAILED: ODPS-0110141:Data version exception”。

1.project 刚做集群迁移,往往前一两天有大量需要跨集群复制的作业。

这种情况是预期中的跨集群复制,需要用户等待。

2.可能是作业提交错集群,或者是中间 project 做过迁移,分区过滤没做好,读取了一些比较老的分区。

检查作业提交的集群是否正确, Logview2.0任务详情页左侧的 BasicInfo 查看作业提交的集群。


  • 执行阶段

logview 的 detail 界面有执行计划(执行计划没有全都绿掉),且作业状态还是 Running。

执行阶段卡住或执行时间比预期长的主要原因有等待资源,数据倾斜,UDF 执行低效,数据膨胀等。

阶段

特征

解决方案

等待资源

一些instance处于Ready状态,部分instance处于Running状态。

确定排队状态是否正常。可以通过 logview 的排队信息“Queue”看作业在队列的位置。

数据倾斜

task 中大多数 instance 都已经结束了,但有某几个 instance 却迟迟不结束(长尾)。


  1. 利用 MaxCompute Studio 的作业执行图及作业详情功能来分析作业运行情况,定位到长尾实例,找到导致长尾的数据来源。
  2. 利用 Logveiw2.0 查看任务执行图和 instance 运行情况来定位长尾实例。

UDF执行低效

某个 task 执行效率低,且该 task 中有用户自定义的扩展。

  1. 检查 UDF 是否有 bug。

有时候 bug 是由于某些特定的数据值引起的,比如出现某个值的时候会引起死循环。

  1. 检查 UDF 函数是否与内置函数同名。

内置函数是有可能被同名 UDF 覆盖的,当看到一个函数像是内置函数时,需要确定是否有同名 UDF 覆盖了内置函数。

  1. 使用内置函数代替 UDF。

evaluate 中只做与参数相关的必要操作。

数据膨胀

task 的输出数据量比输入数据量大很多。

  1. 检查代码是否有 bug:JOIN 条件是不是写错,变成笛卡尔积了;UDTF是不是有问题,输出太多数据。
  2. 检查 Aggregation 引起的数据膨胀。
  3. 避免join引起的数据膨胀。
  4. 由于grouping set 导致的数据膨胀。

在线业务压制

ODPS集群中的一部分是离线集群,另一部分是在线集群。

如果是弹内环境,可通过fuxi sensor确认是否存在在线业务压制。


UDF执行:




set odps.sql.udf.jvm.memory=-- 设定UDF JVM Heap使用的最大内存,单位M,默认1024M-- 可手动调整区间[256,12288]

  • 结束阶段


有时 Fuxi 作业结束时,作业总体进度仍然处于运行状态。原因有两种:

  1. 单 SQL 作业可能包含多个 Fuxi 作业
  2. Fuxi 作业结束后,SQL 在结束阶段运行于控制集群的逻辑占用时间较长


阶段 特性 解决方案

子查询多阶段执行

MaxCompute SQL 的子查询会被编译进同一个 Fuxi DAG,即所有子查询和主查询都通过一个 Fuxi 作业完成。

但也有一些特殊子查询需要先将子查询单独执行

子查询 SELECT DISTINCT ds FROM t_ds_set 先执行,其结果需要被用来做分区裁剪,来优化主查询需要读取的分区数。

过多小文件

存储方面:小文件过多会给 Pangu 文件系统带来一定的压力,且影响空间的有效利用。

计算方面:ODPS 处理单个大文件比处理多个小文件更有效率,小文件过多会影响整体的计算执行性能。

为了避免系统产生过多小文件,SQL作业会在结束时自动触发合并小文件的操作。

根据参数odps.merge.smallfile.filesize.threshold来判定小文件,默认阈值为32MB。

可通过logview查看作业是否触发了自动合并小文件。


动态分区元数据更新

Fuxi 作业执行完后,有可能还有一些元数据操作。


对分区表 sales 使用 insert into ... values命令新增 2000 个分区:

INSERT INTO TABLE sales partition (ds)(ds, product, price)

VALUES ('20170101','a',1),('20170102','b',2),('20170103','c',3), ...;

输出文件size变大

在输入输出条数相差不大的情况,结果膨胀几倍。

一般是数据分布变化导致的,在写表的过程中,会对数据进行压缩,而压缩算法对于重复数据的压缩率是最高的。

子查询:









SELECT     product,    sum(price) FROM     sales WHERE     ds in (SELECT DISTINCT ds FROM t_ds_set) GROUP BY product;

 性能优化

image.png

  • 优化运行时间


在优化运行时间这个维度上,我们重点关注时间上的加速,单位时间内可能会消耗更多的计算资源。总成本有可能上升,也可能降低。

优化类型

具体类型

优化措施

调整并行度

instance数量的增加会对执行速度产生影响:

  1. 更多的instance意味着更长的等待资源和排队次数。
  2. 每个instance的初始化需要一定时间,并行度越高,总初始化时间越长,有效执行时间占比越低。

需要强制 1 个 instance 执行

用户需要检查这些操作是否必要,能否去掉,尽量取消掉这些操作:读表的 task + 非读表的 task

影响单个task并行度主要因素:

  1. 某些操作强制必须 1 个 instance 来执行
  2. 读表的 task
  3. 非读表的 task
  4. HBO会在上面的基础上根据历史作业的执行情况做调整

对于读表的 task,一个 instance 读取 256M的数据,一些常见出问题的情况:

  1. 数据压缩比很高
  2. Task 中执行了一些很 heavy 的操作,特别是存在 UDF
  3. 读取 256M 数据太少,导致 instance 的执行时间太短

可以通过调整flag实现:

set odps.sql.mapper.split.size= xxx

非读表的 task,主要有三种方式调整并行度:

  1. 调整 odps.sql.mapper.split.size
  2. 通过 odps.sql.reducer.instances 强制设置 reducer 并行度
  3. 通过 odps.sql.joiner.instances 强制设置 joiner 并行度

set odps.sql.reducer.instances= xxx

-- 设定Reduce task的instance数量


set odps.sql.joiner.instances= xxx

-- 设定Join task的instance数量

HBO

HBO (History-Based Optimization) 会根据对历史作业的分析来优化当前作业的。

包括内存、并行度等一系列参数,它能让你的周期作业越跑越快。

为了尽可能解决HBO失效这个顽疾,我们在HBO中增加了若干新的功能,包括:

  • realtime hbo
  • task-wise hbo
  • new signature

优化执行计划

CBO优化器会基于统计信息、SQL语义、执行引擎能力、丰富的优化能力,自动生成最优的执行计划,并且在持续提升优化能力。

Map Join Hint

用户可以手动添加map join hint,使得原本的Sort-Merge Join变成Map Join,避免大表数据shuffle从而提升性能。

Distributed Map Join Hint

Distributed MapJoin是MapJoin的升级版,适用于适用于大表Join中表的场景 的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。

Dynamic Filter Hint

基于JOIN等值连接的特性,MaxCompute可以通过表A的数据生成一个过滤器,在Shuffle或JOIN之前提前过滤表B的数据。

物化视图

物化视图(Materialized View)本质是一种预计算,即把某些耗时的操作(如JOIN/AGGREGATE)的结果保存下来。

以便在查询时直接复用,从而避免这些耗时的操作,最终达到加速查询的目的。

数据倾斜

数据Shuffle导致的数据倾斜 1

数据倾斜大多数是由于数据的 reshuffle 引起的,因为按照某个 key 来做 shuffle,同一个 key 值的数据会强制集中在一个 instance 处理。

  1. 去掉 shuffle
  2. 换别的 shuffle key
  3. 将热点数据特殊处理

数据Shuffle导致的数据倾斜 2

特征:读表并写动态分区作业,M task 读入大量数据,但是只会写出少量的 动态分区。

解决方法:set odps.sql.reshuffle.dynamicpt =false; 去掉reshuffle 过程。

  • 优化资源消耗


优化类型

具体类型

优化措施

SQL的新语法、新功能

GROUPING SETS:对 SELECT 语句中 GROUP BY 子句的扩展。

SQL 运行*时物理执行计划做了 3 次聚合,然后再 UNION 起来。


脚本模式:

脚本模式能让用户以脚本的形式提交多条语句同时执行

脚本模式的性能优势,实际上是“将分散的业务逻辑合并成一个作业来运行“的性能优势:

  1. 合并重复的公共操作。
  2. 避免中间数据写表,减少临时表。
  3. 更好的发挥 optimizer 的作用。
  4. 减少了作业调度的开销。

MR典型场景用SQL实现

  1. 使用 SQL-聚合函数

select k, WM_CONCAT(';',concat(v,":",c)) from 

( select k, v, count(v) c from t group by k,v) t2 group by k;

  1. 用 SQL-窗口/分析函数

rows between x preceding|following and y preceding|following

  1. 使用 SQL-UDJ (User Defined Join)

MapReduce 实现的 JOIN 逻辑。

  1. 使用 SQL-TRANSFORM

适用场景:MapReduce Streaming 作业。

合理设置资源参数

Map设置

set odps.sql.mapper.cpu=100

作用:设置处理Map Task每个Instance的CPU数目

set odps.sql.mapper.memory=1024

作用:设定Map Task每个Instance的Memory大小

set odps.sql.mapper.merge.limit.size=64

作用:设定控制文件被合并的最大阈值

set odps.sql.mapper.split.size=256

作用:设定一个Map的最大数据输入量

Join设置

set odps.sql.joiner.instances=-1

作用: 设定Join Task的Instance数量

set odps.sql.joiner.cpu=100

作用: 设定Join Task每个Instance的CPU数目

set odps.sql.joiner.memory=1024

作用:设定Join Task每个Instance的Memory大小

Reduce设置

set odps.sql.reducer.instances=-1

作用: 设定Reduce Task的Instance数量

set odps.sql.reducer.cpu=100

作用:设定处理Reduce Task每个Instance的Cpu数目

set odps.sql.reducer.memory=1024

作用:设定Reduce Task每个Instance的Memory大小


GROUPING SETS 优化措施*:









SELECT NULL, NULL, NULL, COUNT(*)FROM requestsUNION ALLSELECT os, device, NULL, COUNT(*)FROM requests GROUP BY os, deviceUNION ALLSELECT NULL, NULL, city, COUNT(*)FROM requests GROUP BY city;

上述 SQL 运行时物理执行计划做了 3 次聚合,然后再 UNION 起来。




SELECT os, device, city, COUNT(*)FROM requestsGROUP BY os, device, city GROUPING SETS((os, device), (city), ());


物理执行计划只包含一个 Reduce 阶段,无需进行 UNION 操作,使用更少代码的同时消耗更少的集群资源。


 恢复已删

image.png


总结


经过一个多月的整理和总结,终于完成了《ODPS开发大全》的这个基础版本。在这过程中我不断地接触到新的知识点,学到之前未曾掌握的技术,也感叹ODPS功能之丰富强大。期望在未来工作中,自己可以多沉淀好的技术文档,这不仅让我更加深刻地温习过往学习的技术,也可以把知识共享给更多求知若渴的技术人,建设更开放的CS技术社区。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
77 0
|
4月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
119 0
|
6月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
|
6月前
|
存储 分布式计算 MaxCompute
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中
|
5月前
|
数据可视化
Echarts数据可视化开发| 智慧数据平台
Echarts数据可视化开发| 智慧数据平台
|
5月前
|
数据可视化
Echarts数据可视化大屏开发| 大数据分析平台
Echarts数据可视化大屏开发| 大数据分析平台
|
5月前
|
分布式计算 大数据 Java
Scala 入门指南:从零开始的大数据开发
Scala 入门指南:从零开始的大数据开发
|
6月前
|
分布式计算 自然语言处理 MaxCompute
构建NLP 开发问题之如何在数据加载框架中实现从两个ODPS表中分别读取正样本和负样本,并在batch内以1:1的方式混合
构建NLP 开发问题之如何在数据加载框架中实现从两个ODPS表中分别读取正样本和负样本,并在batch内以1:1的方式混合
|
6月前
|
SQL Java 大数据
开发与运维应用问题之大数据SQL数据膨胀如何解决
开发与运维应用问题之大数据SQL数据膨胀如何解决